article

Building an ETL flow with asyncio, multiprocessing and asyncpg

This post will explain how to implement a concurrent ETL (Extract, Transform, Load) flow combining Python asyncio with multiprocessing to get the best of both worlds. ETL itself is a procedure that starts with data extraction from sources such as a database (or many databases). Following extraction, we apply functions to the data in order to modify it. These transformation functions are generally used for cleaning or converting extracted data. This flow is particularly profitable for data warehousing.

Operations with databases are core parts of many applications. Many of these procedures deal with tables containing millions of rows, which can hugely impact the performance of said applications. With Python we’d most commonly use the psycopg2 library to perform operations in a PostgreSQL database.

To speed up ETL operations, we can benefit from concurrent code. Python provides us with asyncio, a built-in library which allows developers to write concurrent code without dealing with low-level programming; thread management, for instance. On asyncio, functions that are concurrently executed are called coroutines. For our ETL context, we’ll wrap those coroutines into tasks and wait for all of them to be done with asyncio.gather function. Its behavior means our code waits for every concurrent task to be completed before it can advance, similarly to the barriers used to synchronize the execution of threads running concurrently.

For our context, we are going to implement the following steps to model ETL:

  1. Fetch records from a database (extract), each row having a name and an age;
  2. Perform an operation over the obtained data (transform) to split the name from each record into first_name and last_name, while keeping the age value unaltered;
  3. Write the transformed data into a database (load).

Since psycopg2 doesn’t support asyncio, one of the available libraries that implement asynchronous database operations is asyncpg. We’ll use it to implement extract and load, steps that deal with database access.

According to asyncpg developers, the library on average is three times faster than psycopg2. It also provides tools to perform database operations, such as prepared statements and cursors. There are however some missing features, such as a built-in mechanism to prevent SQL injection (there’s a trick to implement it though, and we will discuss it later in the text).

Synchronizing all tasks

In order to implement an ETL flow, we must provide functions to represent each stage. In a non-concurrent code, we’d have functions such as fetch_from_db, transform_data and write_into_db that would be called sequentially. In a concurrent run, we want each function to run concurrently, starting computation as soon as there’s data to work with while also providing data to the next stage whenever the previous function’s job is done.

To establish this kind of synchronization between each stage, one approach is to implement a consumer & producer architecture with one queue being used to pass data between them.

We begin structuring our code writing the etl function:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

async def etl():
  with ProcessPoolExecutor(
    max_workers=multiprocessing.cpu_count(),
  ) as pool:
    loop = asyncio.get_running_loop()
    queue = asyncio.Queue(maxsize=1000)
    await asyncio.gather(
      asyncio.create_task(producer(queue)),
      asyncio.create_task(consumer(loop, pool, queue)),
      return_exceptions=False,
    )

def main():
  asyncio.run(etl())

Firstly, our etl function must be asynchronous (through the async keyword) in order to fully work with asyncio.

We'll be using a ProcessPoolExecutor to perform our transformation procedures. This allows us to not halt our execution when processing data. Setting the number of workers in the pool to all CPUs our machine has will help to speed things up by fully using the available resources. The usage of these structures will be further explained in a later section.

Notice that when instantiating our queue, we’ve used the maxsize parameter. Why do we do that?

First of all, using an unbounded queue has the risk of depleting memory resources. Secondly, producer would fill the queue indefinitely and only then consumer would start running. That may end up with synchronous code, as your consumer would only start when producer had nothing else to do.

Defining a queue size limit makes consumer start earlier, as soon as the queue reaches the set limit for the first time. When space is freed up, producer will continue its job and add more elements to the queue, making the code run concurrently. This means the queue is acting as a backpressure mechanism. It’s positive to have such mechanism as long as the production speed is faster than consumption speed, which happens in many cases. For more details regarding these issues, please check this article.

main will be responsible for invoking etl. While main itself is not asynchronous, it uses asyncio.run to execute an asynchronous function. After etl is over, any code after asyncio.run(etl()) would be executed as regular synchronous code.

Producer has a fairly straightforward implementation:

async def producer(queue):
  connection = await get_asyncpg_connection()
  async for record in extract(connection):
    await queue.put(record)
  await queue.put(None)
  await connection.close()

We start by grabbing an asyncpg connection object.  Once attained, we create a loop that fetches data from our database and return the records via extract function. Each record is first inserted into queue then processed by consumer, until there is no more data to fetch. To signalize its job is done, producer will then insert a None value into the queue, which must be handled gracefully by consumer. Lastly, we close the asyncpg connection.

Regarding both get_asyncpg_connection and extract functions, let’s dive into their implementations.

import asyncpg

async def get_asyncpg_connection():
  db_conf = {
    "user": "user_name",
    "password": "password",
    "database": "db_name",
    "host": "host",
    "port": "port",
  }
  connection = await asyncpg.connect(**db_conf)
  return connection

async def extract(connection):
  async with connection.transaction():
    query = "SELECT name, age FROM input_table"
    async for record in connection.cursor(query, prefetch=1000):
      yield record

get_asyncpg_connection is the only part of the code that explicitly interacts with the asyncpg module. It creates  a connection with our database using the asyncpg.connect method. This method's parameters are quite similar to the ones found at psycopg2.connect. To simplify this post, we opted to instantiate a db_conf variable inside get_asyncpg_connection, using dummy values for the connection parameters. When adapting contexts, one would simply change values into real ones.

The extract function is a generator that receives an open asyncpg connection as its single parameter. It's responsible for accessing a table in our database (in our case, the input_table table) via the cursor method inside connection. This fetches records from the database according to the query defined in extract and yields each grabbed row.

Now it’s time to talk about consumer implementation:

def transform(batch):
  transformed_batch = []
  for record in batch:
    first_name, last_name = record["name"].split(" ", 1)
    age = record["age"]
    transformed_batch.append((first_name, last_name, age))
  return transformed_batch

async def load(batch, connection):
  async with connection.transaction():
    columns = ("first_name", "last_name", "age")
    await connection.copy_records_to_table(
      "output_table", records=batch, columns=columns
    )

async def task_set_load_helper(task_set, connection):
  for future in task_set:
    await load(await future, connection)

async def consumer(loop, pool, queue):
  connection = await get_asyncpg_connection()
  task_set = set()
  batch = []
  while True:
    record = await queue.get()
    if record is not None:
      record = dict(record)
      batch.append(record)
    if queue.empty():
      task = loop.run_in_executor(pool, transform, batch)
      task_set.add(task)
      if len(task_set) >= pool._max_workers:
        done_set, task_set = await asyncio.wait(
          task_set, return_when=asyncio.FIRST_COMPLETED
        )
        await task_set_load_helper(done_set, connection)
      batch = []
    if record is None:
      break
  if task_set:
    await task_set_load_helper(
      asyncio.as_completed(task_set), connection
    )
  await connection.close()

The consumer function will be responsible for grabbing the records our producer stores in the queue and add them to a batch. We must remember to convert each record into a dict before adding it to the batch in order to process data later - not doing so would raise a TypeError: can't pickle asyncpg.Record objects exception. Whenever the queue becomes empty, consumer will start running the transformation step over the batch it built.

The transform function receives a batch from the consumer, which is nothing more than a list of dict objects fetched from the database. Since we want to split the full names from each record into first and last names, we start by looping over the batch and grabbing every record. Our context is simple, so the transformation is pretty straightforward. After obtaining the transformed data we create a tuple containing each value and finally add it to a list representing a modified batch.

As applications are more complex and involve multiple output tables, it’s recommended to keep track of the columns and which table the records should be inserted into. This can be achieved by using a dict to group records from each table. Since our use-case involves a single output table, using a list of tuple is enough.

When we use run_in_executor method on line 30, our transformation step runs inside a given executor (in our case it’ll run in pool, which is the instance of ProcessPoolExecutor we've created on etl). Since transformation function is CPU-bound, if we executed transform(batch) directly here we would halt other coroutines until our batch has been fully processed, therefore dragging execution time.

Moreover, run_in_executor returns a Future object, not a processed batch. According to the Python docs, “a Future represents an eventual result of an asynchronous operation”, meaning that our data won’t necessarily be ready when line 30 is executed, but will be at some point in the future. This means transform will be running in background, so we can move on to process next batches concurrently.

The conditional block starting on line 32 prevents the application running out of memory by storing too many transform results. Whenever task_set has more objects than our pool has of available workers, we can speed things up and process some tasks to obtain transformation results earlier. Since they’re Future objects, we must wait for them to be completed. After calling asyncio.wait, we’ll wait for a task to become ready. When this happens, we grab a new done_set containing all completed tasks and update task_set to keep track only of the pending ones.

As we now have completed tasks, we can store the results in the database. Since we are going to use our storing procedures elsewhere, we created a task_set_load_helper helper function to avoid code repetition. It’s responsible for iterating over a set of completed tasks and calling the load function that will effectively write into database.

Whenever load is called from within task_set_load_helper, it will receive a list with transformed data and store it into output_table in our database. To achieve this, we use the copy_records_to_table method from asyncpg connection object to bulk insert the records, which is faster than inserting each record individually. It receives a table to write data into and a list of tuple objects representing the records to insert. It may additionally receive an iterable with the columns from the table. It’s important to note that each column in this iterable must match the order of the respective value in the objects from the records parameter.

When there are no more records to process, we finish creating batches and move on to obtain the last transformation results, in case there are still some objects to work with. We perform a similar procedure to what was done on line 36, but now we have to pass the entire set with the remaining tasks, which could still be pending. That means we must apply the asyncio.as_completed function over task_set to be assured that we’ll be iterating exclusively over completed tasks as soon as they become available.

Caveats

SQL injection is one of the main concerns when using asyncpg. When using psycopg2, we can avoid this issue by composing our queries using the psycopg2.sql module objects, such as in the example below:

from psycopg2 import sql

table_name = "input_table"
column_name = "name"
value = "foo"

query = sql.SQL(
  "SELECT * FROM {table_name} WHERE {column_name}={value}"
).format(
  table_name=sql.Identifier(table_name),
  column_name=sql.Identifier(column_name),
  value=sql.Literal(value)
)

By using the sql.Identifier and sql.Literal we can respectively sanitize identifiers (i.e.: table and column names) and literals (i.e.: strings and integers) to build parameterized queries, while being assured that the resulting query won’t harm our database due to a maliciously crafted input.

asyncpg allows us to build parameterized queries using syntax similar to the one used by native PostgreSQL, using the $n syntax to provide query arguments. However, since PostgreSQL queries don’t allow us to parameterize tables and columns names, we are stuck with using Python’s string formatting to compose queries with dynamic identifiers. Without proper care, this can lead to catastrophic results in case these identifiers are user-provided. A malicious user can send a value to our system that when processed can lead to a DROP DATABASE statement being executed.

Build safe systems. Have user profile in mind whenever designing permissions. Allowing admins to dynamically choose tables is sound, but granting an end-user the same privileges may be compromising.

In order to work around these issues we can look how psycopg2 works behind the scenes with sql.Identifier.

async def sanitize_identifier(identifier, connection):
  sanitized = await connection.fetchval("SELECT quote_ident($1)", identifier)
  return sanitized

async def sanitize_table(table, connection):
  try:
    dirty_schema, dirty_table = table.split(".")
    sanitized_schema = await sanitize_identifier(dirty_schema, connection)
    sanitized_table = await sanitize_identifier(dirty_table, connection)
    sanitized = f"{sanitized_schema}.{sanitized_table}"
  except ValueError:
    sanitized = await sanitize_identifier(table, connection)
  return sanitized

The magic happens on sanitize_identifier, where we establish a connection with the database (via an asyncpg connection object) and retrieve the result for the quote_ident PostgreSQL function, which helps us to sanitize input. Because table name is a literal and not an identifier in this case, notice that we are using the $n syntax to pass the dynamic value to our query.

sanitize_table builds upon the previous function in order to cover situations where our table name is prefixed by a schema name. We have to sanitize both components separately, not as a single string. After cleaning, we can safely merge them again into a single value using Python’s formatting.

Below is an example of a more robust extract function, which would allow us to fetch data from our database filtering the results from a dynamic table and column.

async def extract(table_name, column_name, value, connection)
  async with connection.transaction():
    sanitized_table_name = sanitize_table(table_name, connection)
    sanitized_column_name = sanitize_identifier(column_name, connection)
    query = f"SELECT * FROM {sanitized_table_name} WHERE {sanitized_column_name}=$1"
    async for record in connection.cursor(query, value):
      yield dict(record)

More info

Below are some approaches to our topic that were implemented on third-party solutions:

  • aiomultiprocess - A Python lib that also combines asyncio and multiprocessing and implements a solution quite similar to ours. Here's also a PyCon 2018 talk that introduces this library.
  • pypeln - A Python lib that abstracts the process when creating a concurrent data pipeline.

Special thanks to Rafael Carício who kindly reviewed this post.

Renato Vieira

Fullstack Developer at Vinta Software.