Microsoft Azure cloud computing platform faces criticism due to its Python API being little customisable and poorly documented. Still it is a popular choice for many companies, thus data scientists need to squeeze maximum of features and performance out of it rather than complain. Below I am sharing thoughts on creating robust Extract-Transform-Load processes in this setup.
Extract from database: consumer-producer!
The key trick is to enable the consumer-producer pattern: you want to process your data in batches, for the sake of robustness and efficiency. The popular pandas library dedicated for tabular data is not enough for this task, as it hinders useful features. Gain more control using dedicated database drivers, e.g. psycogp driver for the popular Postgres database.
import psycopg2
dsn = f'user={db_user} password={db_password} dbname={db_name} host={db_host}'
conn = psycopg2.connect(dsn)
query = """
SELECT *
FROM sales
WHERE date > '2021'
"""
def process_rows(row_group):
'''do your stuff, e.g. filter and append to a file'''
pass
n_prefetch = 10000
# mind the server-side cursor!
with conn.cursor(name='server_side_cursor') as cur:
cur.itersize = n_prefetch
cur.execute(query)
while True:
row_group = cur.fetchmany(n_prefetch)
if len(row_group) > 0:
process_rows(row_group)
else:
break
Cast datatypes
Cast datatypes early and explicitly having these point in mind
- avoid data types inferring if possible, because it can be very slow and not 100% accurate
- align datatypes to your destination format, for example pandas work better with date and time combined into datetime due to rich features of the latter, while Apache Parquet benefits from aligning numerical precision because it is stored in chunks.
Best to do this casting upstream, adapting at the database driver level, like in this example:
import psycopg2
# cast some data types upon receiving from database
datetype_casted = psycopg2.extensions.new_type(
psycopg2.extensions.DATE.values, "date", psycopg2.DATETIME
)
psycopg2.extensions.register_type(datetype_casted)
decimal_casted = psycopg2.extensions.new_type(
psycopg2.extensions.DECIMAL.values, "decimal", psycopg2.extensions.FLOAT
)
psycopg2.extensions.register_type(decimal_casted)
Use Parquet to store tabular data
The Apache Parquet is invaluable to efficient work with large data in tabular format. There are two major drivers for Pyhon: pyarrow and fastparquet. The first one can be integrated with Azure data flows (e.g. you can stream and filter data), although it has been limited in supporting more sophisticated data such as timedelta. Remember that writing Parquet incurs memory overhead so better to do this in batches. The relevant code may look as below:
def sql_to_parquet(
conn, query, column_names, target_dir, n_prefetch=1000000, **parquet_kwargs
):
"""Writes the result of a SQL query to a Parquet file (in chunks).
Args:
conn: Psycopg connection object (must be open)
query: SQL query of "select" type
column_names: column names given to the resulting SQL table
target_dir: local directory where Parquet is written to; must exist, data is overwritten
n_prefetch: chunk of SQL data processed (read from SQL and dumped to Parquet) at a time. Defaults to 1000000.
"""
with conn.cursor(name="server_side_cursor") as cur:
# start query
cur.itersize = n_prefetch
cur.execute(query)
# set up consumer
chunk = 0
# consume until stream is empty
while True:
# get and process one batch
row_group = cur.fetchmany(n_prefetch)
chunk += 1
if len(row_group) > 0:
out = pd.DataFrame(data=row_group, columns=column_names)
fname = os.path.join(target_dir, f"part_{chunk:04d}.parquet")
out.to_parquet(fname, engine="pyarrow", **parquet_kwargs)
else:
break
def df_to_parquet(df, target_dir, chunk_size=100000, **parquet_kwargs):
"""Writes pandas DataFrame to parquet format with pyarrow.
Args:
df: pandas DataFrame
target_dir: local directory where parquet files are written to
chunk_size: number of rows stored in one chunk of parquet file. Defaults to 100000.
"""
for i in range(0, len(df), chunk_size):
slc = df.iloc[i : i + chunk_size]
chunk = int(i / chunk_size)
fname = os.path.join(target_dir, f"part_{chunk:04d}.parquet")
slc.to_parquet(fname, engine="pyarrow", **parquet_kwargs)
Leverage Azure API properly!
The Azure API is both cryptic in documentation and under-explained in data science blogs. Below I am sharing a comprehensive receipt on how to upload and register Parquet data with minimum effort. While uploading makes the data persist, registering enables further Azure features (such as pipe-lining). Best to dump your Parquet file to a directory (large Parquet files should be chunked) with tempfile then use the Azure Dataset class to both upload to the storage (use universally unique identifier for reproducibility/avoiding path collisions) and register to the workspace (use Tabular subclass). Use classes Workspace and Datastore to facilitate interaction with Azure.
from azureml.core import Workspace, Dataset
import tempfile
from uuid import uuid4
import psycopg2
ws = Workspace.from_config()
dstore = ws.datastores.get("my_datastore")
with tempfile.TemporaryDirectory() as tempdir:
sql_to_parquet(conn, query, query_cols, tempdir)
target = (dstore, f"raw/{tab}/{str(uuid4())}")
Dataset.File.upload_directory(tempdir, target)
ds = Dataset.Tabular.from_parquet_files(target)
ds.register(
ws,
name=f"dataset_{tab}",
description="created from Sales table",
tags={"JIRA": "ticket-01"},
create_new_version=True,
)