Building an ETL flow with asyncio, multiprocessing and asyncpg

Renato Vieira
March 31, 2021
<p>This post will explain how to implement a concurrent ETL (<strong>E</strong>xtract, <strong>T</strong>ransform, <strong>L</strong>oad) flow combining Python <code>asyncio</code> with <code>multiprocessing</code> to get the best of both worlds. ETL itself is a procedure that starts with data extraction from sources such as a database (or many databases). Following extraction, we apply functions to the data in order to modify it. These transformation functions are generally used for cleaning or converting extracted data. This flow is particularly profitable for data warehousing.</p><p>Operations with databases are core parts of many applications. Many of these procedures deal with tables containing millions of rows, which can hugely impact the performance of said applications. With Python we’d most commonly use the <code>psycopg2</code> library to perform operations in a PostgreSQL database.</p><p>To speed up ETL operations, we can benefit from concurrent code. Python provides us with <a href=""><code>asyncio</code></a>, a built-in library which allows developers to write concurrent code without dealing with low-level programming; thread management, for instance. On <code>asyncio</code>, functions that are concurrently executed are called coroutines. For our ETL context, we’ll wrap those coroutines into tasks and wait for all of them to be done with <code>asyncio.gather</code> function. Its behavior means our code waits for every concurrent task to be completed before it can advance, similarly to the barriers used to synchronize the execution of threads running concurrently.</p><p>For our context, we are going to implement the following steps to model ETL:</p><ol><li>Fetch records from a database (<code>extract</code>), each row having a <code>name</code> and an <code>age</code>;</li><li>Perform an operation over the obtained data (<code>transform</code>) to split the <code>name</code> from each record into <code>first_name</code> and <code>last_name</code>, while keeping the <code>age</code> value unaltered;</li><li>Write the transformed data into a database (<code>load</code>).</li></ol><p>Since <code>psycopg2</code> doesn’t support <code>asyncio</code>, one of the available libraries that implement asynchronous database operations is <a href=""><code>asyncpg</code></a>. We’ll use it to implement extract and load, steps that deal with database access.</p><p>According to <code>asyncpg</code> developers, <a href="">the library on average is three times faster than psycopg2</a>. It also provides tools to perform database operations, such as prepared statements and cursors. There are however some missing features, such as a built-in mechanism to prevent SQL injection (there’s a trick to implement it though, and we will discuss it later in the text).</p><h1 id="synchronizing-all-tasks">Synchronizing all tasks</h1><p>In order to implement an ETL flow, we must provide functions to represent each stage. In a non-concurrent code, we’d have functions such as <code>fetch_from_db</code>, <code>transform_data</code> and <code>write_into_db</code> that would be called sequentially. In a concurrent run, we want each function to run concurrently, starting computation as soon as there’s data to work with while also providing data to the next stage whenever the previous function’s job is done.</p><p>To establish this kind of synchronization between each stage, one approach is to implement a consumer &amp; producer architecture with one queue being used to pass data between them.</p><p>We begin structuring our code writing the <code>etl</code> function:</p><pre><code>import asyncio import multiprocessing from concurrent.futures import ProcessPoolExecutor async def etl(): with ProcessPoolExecutor( max_workers=multiprocessing.cpu_count(), ) as pool: loop = asyncio.get_running_loop() queue = asyncio.Queue(maxsize=1000) await asyncio.gather( asyncio.create_task(producer(queue)), asyncio.create_task(consumer(loop, pool, queue)), return_exceptions=False, ) def main():</code></pre><p>Firstly, our <code>etl</code> function must be asynchronous (through the <code>async</code> keyword) in order to fully work with <code>asyncio</code>.</p><p>We'll be using a <code>ProcessPoolExecutor</code> to perform our transformation procedures. This allows us to not halt our execution when processing data. Setting the number of workers in the pool to all CPUs our machine has will help to speed things up by fully using the available resources. The usage of these structures will be further explained in a later section.</p><p>Notice that when instantiating our queue, we’ve used the <code>maxsize</code> parameter. Why do we do that?</p><p>First of all, using an unbounded queue has the risk of depleting memory resources. Secondly, producer would fill the queue indefinitely and only then consumer would start running. That may end up with synchronous code, as your consumer would only start when producer had nothing else to do.</p><p>Defining a queue size limit makes consumer start earlier, as soon as the queue reaches the set limit for the first time. When space is freed up, producer will continue its job and add more elements to the queue, making the code run concurrently. This means the queue is acting as a <a href="">backpressure mechanism</a>. It’s positive to have such mechanism as long as the production speed is faster than consumption speed, which happens in many cases. For more details regarding these issues, please check <a href="">this article</a>.</p><p><code>main</code> will be responsible for invoking <code>etl</code>. While <code>main</code> itself is not asynchronous, it uses <code></code> to execute an asynchronous function. After <code>etl</code> is over, any code after <code></code> would be executed as regular synchronous code.</p><p>Producer has a fairly straightforward implementation:</p><pre><code>async def producer(queue): connection = await get_asyncpg_connection() async for record in extract(connection): await queue.put(record) await queue.put(None) await connection.close()</code></pre><p>We start by grabbing an <code>asyncpg</code> connection object. Once attained, we create a loop that fetches data from our database and return the records via <code>extract</code> function. Each record is first inserted into queue then processed by consumer, until there is no more data to fetch.<strong> </strong>To signalize its job is done, producer will then insert a <code>None</code> value into the queue, which must be handled gracefully by consumer. Lastly, we close the <code>asyncpg</code> connection.</p><p>Regarding both <code>get_asyncpg_connection</code> and <code>extract</code> functions, let’s dive into their implementations.</p><pre><code>import asyncpg async def get_asyncpg_connection(): db_conf = { "user": "user_name", "password": "password", "database": "db_name", "host": "host", "port": "port", } connection = await asyncpg.connect(**db_conf) return connection async def extract(connection): async with connection.transaction(): query = "SELECT name, age FROM input_table" async for record in connection.cursor(query, prefetch=1000): yield record</code></pre><p><code>get_asyncpg_connection</code> is the only part of the code that explicitly interacts with the <code>asyncpg</code> module. It creates a connection with our database using the <code>asyncpg.connect</code> method. This method's parameters are quite similar to the ones found at <code>psycopg2.connect</code>. To simplify this post, we opted to instantiate a <code>db_conf</code> variable inside <code>get_asyncpg_connection</code>, using dummy values for the connection parameters. When adapting contexts, one would simply change values into real ones.</p><p>The <code>extract</code> function is a generator that receives an open <code>asyncpg</code> connection as its single parameter. It's responsible for accessing a table in our database (in our case, the <code>input_table</code> table) via the <code>cursor</code> method inside <code>connection</code>. This fetches records from the database according to the query defined in <code>extract</code> and yields each grabbed row.</p><p>Now it’s time to talk about <code>consumer</code> implementation:</p><pre><code>def transform(batch): transformed_batch = [] for record in batch: first_name, last_name = record["name"].split(" ", 1) age = record["age"] transformed_batch.append((first_name, last_name, age)) return transformed_batch async def load(batch, connection): async with connection.transaction(): columns = ("first_name", "last_name", "age") await connection.copy_records_to_table( "output_table", records=batch, columns=columns ) async def task_set_load_helper(task_set, connection): for future in task_set: await load(await future, connection) async def consumer(loop, pool, queue): connection = await get_asyncpg_connection() task_set = set() batch = [] while True: record = await queue.get() if record is not None: record = dict(record) batch.append(record) if queue.empty(): task = loop.run_in_executor(pool, transform, batch) task_set.add(task) if len(task_set) &gt;= pool._max_workers: done_set, task_set = await asyncio.wait( task_set, return_when=asyncio.FIRST_COMPLETED ) await task_set_load_helper(done_set, connection) batch = [] if record is None: break if task_set: await task_set_load_helper( asyncio.as_completed(task_set), connection ) await connection.close()</code></pre><p>The <code>consumer</code> function will be responsible for grabbing the records our producer stores in the queue and add them to a batch. We must remember to convert each record into a <code>dict</code> before adding it to the batch in order to process data later - not doing so would raise a <code>TypeError: can't pickle asyncpg.Record objects</code> exception. Whenever the queue becomes empty, consumer will start running the transformation step over the batch it built.</p><p>The <code>transform</code> function receives a batch from the consumer, which is nothing more than a <code>list</code> of <code>dict</code> objects fetched from the database. Since we want to split the full names from each record into first and last names, we start by looping over the batch and grabbing every record. Our context is simple, so the transformation is pretty straightforward. After obtaining the transformed data we create a <code>tuple</code> containing each value and finally add it to a <code>list</code> representing a modified batch.</p><p>As applications are more complex and involve multiple output tables, it’s recommended to keep track of the columns and which table the records should be inserted into. This can be achieved by using a <code>dict</code> to group records from each table. Since our use-case involves a single output table, using a <code>list</code> of <code>tuple</code> is enough.</p><p>When we use <code>run_in_executor</code> method on line 30, our transformation step runs inside a given executor (in our case it’ll run in <code>pool</code>, which is the instance of <code>ProcessPoolExecutor</code> we've created on <code>etl</code>). Since transformation function is CPU-bound, if we executed <code>transform(batch)</code> directly here we would halt other coroutines until our batch has been fully processed, therefore dragging execution time.</p><p>Moreover, <code>run_in_executor</code> returns a <code>Future</code> object, not a processed batch. According to the Python docs, <a href="">“a <code>Future</code> represents an eventual result of an asynchronous operation”</a>, meaning that our data won’t necessarily be ready when line 30 is executed, but will be at some point in the future. This means <code>transform</code> will be running in background, so we can move on to process next batches concurrently.</p><p>The conditional block starting on line 32 prevents the application running out of memory by storing too many transform results. Whenever <code>task_set</code> has more objects than our pool has of available workers, we can speed things up and process some tasks to obtain transformation results earlier. Since they’re <code>Future</code> objects, we must wait for them to be completed. After calling <code>asyncio.wait</code>, we’ll wait for a task to become ready. When this happens, we grab a new <code>done_set</code> containing all completed tasks and update <code>task_set</code> to keep track only of the pending ones.</p><p>As we now have completed tasks, we can store the results in the database. Since we are going to use our storing procedures elsewhere, we created a <code>task_set_load_helper</code> helper function to avoid code repetition. It’s responsible for iterating over a set of completed tasks and calling the <code>load</code> function that will effectively write into database.</p><p>Whenever <code>load</code> is called from within <code>task_set_load_helper</code>, it will receive a <code>list</code> with transformed data and store it into <code>output_table</code> in our database. To achieve this, we use the <code>copy_records_to_table</code> method from <code>asyncpg</code> connection object to bulk insert the records, <a href="">which is faster than inserting each record individually</a>. It receives a table to write data into and a <code>list</code> of <code>tuple</code> objects representing the records to insert. It may additionally receive an iterable with the columns from the table. It’s important to note that each column in this iterable must match the order of the respective value in the objects from the <code>records</code> parameter.</p><p>When there are no more records to process, we finish creating batches and move on to obtain the last transformation results, in case there are still some objects to work with. We perform a similar procedure to what was done on line 36, but now we have to pass the entire set with the remaining tasks, which could still be pending. That means we must apply the <code>asyncio.as_completed</code> function over <code>task_set</code> to be assured that we’ll be iterating exclusively over completed tasks as soon as they become available.</p><h1 id="caveats">Caveats</h1><p><a href="">SQL injection</a> is one of the main concerns when using <code>asyncpg</code>. When using <code>psycopg2</code>, we can avoid this issue by composing our queries using the <code>psycopg2.sql</code> module objects, such as in the example below:</p><pre><code>from psycopg2 import sql table_name = "input_table" column_name = "name" value = "foo" query = sql.SQL( "SELECT * FROM {table_name} WHERE {column_name}={value}" ).format( table_name=sql.Identifier(table_name), column_name=sql.Identifier(column_name), value=sql.Literal(value) )</code></pre><p>By using the <code>sql.Identifier</code> and <code>sql.Literal</code> we can respectively sanitize identifiers (i.e.: table and column names) and literals (i.e.: strings and integers) to build parameterized queries, while being assured that the resulting query won’t harm our database due to a maliciously crafted input.</p><p><code>asyncpg</code> allows us to build parameterized queries using <a href="">syntax similar to the one used by native PostgreSQL,</a> using the <code>$n</code> syntax to provide query arguments. However, since PostgreSQL queries don’t allow us to parameterize tables and columns names, we are stuck with using Python’s string formatting to compose queries with dynamic identifiers. Without proper care, this can lead to catastrophic results in case these identifiers are user-provided. A malicious user can send a value to our system that when processed can lead to a <code>DROP DATABASE</code> statement being executed.</p><p>Build safe systems. Have user profile in mind whenever designing permissions. Allowing admins to dynamically choose tables is sound, but granting an end-user the same privileges may be compromising.</p><p>In order to work around these issues we can look how <code>psycopg2</code> works behind the scenes with <code>sql.Identifier</code>.</p><pre><code>async def sanitize_identifier(identifier, connection): sanitized = await connection.fetchval("SELECT quote_ident($1)", identifier) return sanitized async def sanitize_table(table, connection): try: dirty_schema, dirty_table = table.split(".") sanitized_schema = await sanitize_identifier(dirty_schema, connection) sanitized_table = await sanitize_identifier(dirty_table, connection) sanitized = f"{sanitized_schema}.{sanitized_table}" except ValueError: sanitized = await sanitize_identifier(table, connection) return sanitized</code></pre><p>The magic happens on <code>sanitize_identifier</code>, where we establish a connection with the database (via an <code>asyncpg</code> connection object) and retrieve the result for the <code>quote_ident</code> PostgreSQL function, which helps us to sanitize input. Because table name is a literal and not an identifier in this case, notice that we are using the <code>$n</code> syntax to pass the dynamic value to our query.</p><p><code>sanitize_table</code> builds upon the previous function in order to cover situations where our table name is prefixed by a schema name. We have to sanitize both components separately, not as a single string. After cleaning, we can safely merge them again into a single value using Python’s formatting.</p><p>Below is an example of a more robust <code>extract</code> function, which would allow us to fetch data from our database filtering the results from a dynamic table and column.</p><pre><code>async def extract(table_name, column_name, value, connection) async with connection.transaction(): sanitized_table_name = sanitize_table(table_name, connection) sanitized_column_name = sanitize_identifier(column_name, connection) query = f"SELECT * FROM {sanitized_table_name} WHERE {sanitized_column_name}=$1" async for record in connection.cursor(query, value): yield dict(record)</code></pre><h1 id="more-info">More info</h1><p>Below are some approaches to our topic that were implemented on third-party solutions:</p><ul><li><code><a href="">aiomultiprocess</a></code> - A Python lib that also combines <code>asyncio</code> and <code>multiprocessing</code> and implements a solution quite similar to ours. Here's also a <a href=";">PyCon 2018 talk</a> that introduces this library.</li><li><code><a href="">pypeln</a></code> - A Python lib that abstracts the process when creating a concurrent data pipeline. </li></ul><p>Special thanks to <a href="">Rafael Carício</a> who kindly reviewed this post.</p>