Introduction
airgoodies
is a utility library for Apache Airflow, that allows easy development for Apache Airflow
by removing a lot of boiler plate code form the end user.
Using airgoodies
helps in keeping your dags
smaller and more maintainable.
Installation
airgoodies
is available for specific Apache Airflow
and Python
versions:
Please view the matrix below to choose which one best suites you:
Airgoodies Version | Apache Airflow Version | Python Version | Project tag |
---|---|---|---|
0.0.4 | 2.7.2 | 3.11 | v0.0.4 |
0.0.3 | 2.7.2 | 3.11 | v0.0.3 |
0.0.2 | 2.7.2 | 3.11 | v0.0.2 |
0.0.1-alpha | 2.7.2 | 3.11 | v0.0.1-alpha |
Keep in mind that those are only the versions tested and that it is known to be working, but is should not be limited by it, so a version might not be documented here, but could work for you.
airgoodies
can be installed in many different ways:
- Install using
PyPi
directly into yourAirflow
deployment: Install using Pypi - Install directly from sources: Install from sources
Install using PyPi
airgoodies
is available directly from PyPi and can be installed on your Apache Airflow
by updating your
requirements.txt
:
# requirements.txt
airgoodies==0.0.4
For all the available versions please check here
Install from Sources
TODO
Features overview
Task generation using YAML Files
airgoodies
enables the ability to dynamically create tasks using YAML files with a specific syntax.
Currently the supported syntax offers the following commands:
$task_name: # This is the name that will appear for the task (task_id)
$task: <task> # The task to execute
config: # The configuration of the task
$options: $values
airgoodies_transform_method | custom_transform_method: $method_name
Options for $task
:
option | explanation |
---|---|
airgoodies_task | use a predefined airgoodies task from this list |
custom_task | run a custom python task, by providing the path of the callable |
Options for config
:
Config enables the configuration of the task, and the options are defined by the $task
option you have chosen.
Pre-defined airgoodies
tasks
Airgoodies supports various operations out of the box, requiring minimum configuration.
- load_from_s3_to_mongo_table: Offers the ability to load a file from an S3 bucket directly into a MongoDB table, giving the ability to edit the data before saving.
load_from_s3_to_mongo_table
Overview
The load_from_s3_to_mongo_table
task, enables the ability to load a file (currently supported .csv
, .xls
, .xlsx
)
directly from an S3 bucket into a MongoDB table, offering the ability to perform actions in between.
Transform method options
option | values |
---|---|
airgoodies_transform_method | print_table_contents |
custom_transform_method | path.to.method : the method must have the signature Callable[[pandas.DataFrame], pandas.DataFrame] (View examples below) |
output_table_name | the name of the MongoDB collection to save the result into, default is {dag_id}_output_table |
Example YAML syntax
example_task:
airgoodies_task: load_from_s3_to_mongo_table
config:
airgoodies_transform_method: print_table_contents
output_table_name: my_output_table
The above YAML defines a task that will load a file from the configured bucket, print it's contents and export it into a MongoDB table.
Example with custom transform method
Let's say we want to load a file from an S3 Bucket, modify the contents and then save the output to MongoDB.
For this case consider the following file population.csv
city,population;year
Athens,3153000;2021
Athens,3154000;2022
Patras,215922;2021
Patras,213984;2011
And instead of loading all the cities, we need to load only the ones for Athens
.
In this case we need to create our own custom method and link it.
Considering the following structure:
root/
\_ dags/
\_ my_dag.py
\_ custom/
\_ transform.py
We create a new function in transform.py
:
import pandas as pd
import logging
def keep_athens(data: pd.DataFrame) -> pd.DataFrame:
"""
Function that loads the pandas DataFrame and removes the lines that are not
related to Athens.
"""
logger: logging.Logger = logging.Logger('airflow.task')
logger.info('Running custom transform method: keep_athens')
data.drop(data[data['city'] != 'Athens'].index, inplace=True)
logger.info(f'New data:\n{data}')
return data # Return the DataFrame to write it in MongoDB
Now, we need to create a YAML file that creates our task in Airflow
.
The file should look like this:
my_load_data_task:
airgoodies_task: load_from_s3_to_mongo_table
config:
custom_transform_method: custom.transform.keep_athens # Path to our function
airgoodies
examples
Description
The following example demonstrates how to create a project from scratch, using a dockerized Airflow
project, an S3
Bucket and a MongoDB Connection.
In this example we will demonstrate how to use airgoodies
to quickly set up a workflow that will load a .csv
file
from our S3 Bucket, filters its data to extract what we need and afterward save the result into a MongoDB
table.
You can find the source code for this example here
Prerequisites
In order to execute the following example locally, we need to have:
- a valid
Docker
installation - a
docker-compose
installation - an
Amazon S3
bucket
Basic setup
The basic setup of the project, begins with the structure:
We create a new project directory:
mkdir my_project && cd my_project
In this project we need to create the folder structure as shown bellow:
my_project/
\__ Dockerfile
\__ docker-compose.yaml
\__ requirements.txt
\__ dags/
\__ __init__.py
\__ my_dag.py
\__ custom
__init__.py
transform.py
To generate this structure we run the following command:
touch Dockerfile
touch docker-compose.yaml
touch requirements.txt
mkdir dags/
cd dags
touch __init__.py
touch my_dag.py
mkdir custom
cd custom
touch __init__.py
touch transform.py
Create the Dockerfile
Inside the Dockerfile we paste the following:
# Custom image for Airflow 2.7.2 and Python version 3.11
FROM apache/airflow:2.7.2-python3.11
COPY requirements.txt .
# Upgrade PIP
RUN pip install --upgrade pip
# Install the requirements for local dev
RUN pip install -r requirements.txt
This will create an Apache Airflow v2.7.2
with Python 3.11
, next open our docker-compose.yaml
and paste the content from
the example's code.
This will create a full Airflow
deployment locally with an associated MongoDB
for our example.
Create the requiements.txt
Before starting anything on docker
we need to install our requirements.
For that, we open our requirements.txt
and insert the following:
# Add the requirement for `airgoodies`
airgoodies==0.0.4
Starting the instance
Now, all we have to do is build and start our airflow
deployment:
docker-compose build
docker-compose up -d
After a while, we can navigate to http://localhost:8080
, use username: airflow
and password: airflow
to view the
console.
Setting up the environment
The first step we need to do is to configure the variables used by the DAG
, and specifically
used by the airgoodies
modules that we are going to user.
We navigate to the Apache Airflow -> Admin -> Variables page, and
select the Choose file
option, and insert the following file (airgoodies-variables.json
):
{
"csv_loader.airgoodies-mongo-db-connection-url": "mongodb://root:root@mongo:27017",
"csv_loader.airgoodies-mongo-db-name": "data",
"csv_loader.airgoodies-aws-s3-connection-name": "s3_conn",
"csv_loader.airgoodies-aws-s3-default-bucket": "<s3-bucket-name>",
"csv_loader.airgoodies-dag-config-file-key": "csv_loader_dag_config.yaml",
"csv_loader.airgoodies-task-aws-load-from-s3-to-mongo-input-file-variable": "input_file"
}
(Replace the <s3-bucket-name>
with the name of your S3 Bucket selected for the example).
And afterwards select the Import Variable
option, to insert the Variables into our Airflow instance.
Next step is to create the S3 connection defined in the csv_loader.airgoodies-aws-s3-connection-name
variable.
We simply navigate to Apache Airflow -> Admin -> Connections -> New Connection and create connection as such:
And insert the AWS Access Key ID
and AWS Secret Access Key
to allow Airflow
to access S3 resources, and
select Save
Creating the DAG
In order to create a DAG
we copy the following code into dags/csv_loader.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airgoodies.command.parser import CommandParser
from airflow.operators.python import PythonOperator
with DAG(
dag_id='csv_loader', # used as prefix for all variables in `airgoodies-variables.json`
schedule_interval=None,
start_date=days_ago(2),
default_args={
'owner': 'airflow'
},
catchup=False,
tags=['csv_loader', 'aws', 'mongo', 'airgoodies:examples'],
) as dag:
# Get the logger
import logging
logger: logging.getLogger = logging.getLogger('airflow.task')
# Initialize the command parser
command_parser: CommandParser = CommandParser(dag=dag)
tasks: [PythonOperator] = []
# Parse the commands from the `csv_loader_dag_config.yaml` in S3
for _task in command_parser.get_commands():
tasks = tasks >> _task.to_operator(dag=dag)
tasks
This snippet of code, basically initializes a DAG and uses the fundamental airgoodies
utility,
CommandParser
which will dynamically create our tasks from the YAML
find that we will create in the next step.
Creating the configuration file (csv_loader_dag_config.yaml
)
Now, as we said earlier, airgoodies
has the ability to dynamically generate our tasks, by using a YAML file,
instead of Python code.
In our case, we want to load a file from S3, parse it and save it to our MongoDB. For this, airgoodies
already
provides a pre-defined task
that can handle all the boilerplate functionality,
called load-from-s3-to-mongo-table.
Based on the above documentation, we need to crete a new file, let's name it csv_loader_dag_config.yaml
as we
specified in our
task Variable csv_loader.airgoodies-dag-config-file-key
, and insert the configuration:
load_from_s3_and_save: # This will appear as our task_id in airflow
airgoodies_task: load_from_s3_to_mongo_table # The airgoodies pre-defined task name
config: # The configuration of the task
custom_transform_method: custom.transform.keep_athens # Use our custom transform method
output_table_name: my_output_table # The name of the collection to save the result into
Save and upload in our S3 Bucket
.
Creating the input data (population.csv
)
Next step, is to create the data of the demo, simply create a new file population.csv
and insert the following data:
city,population,year
Athens,3153000,2021
Athens,3154000,2022
Patras,215922,2021
Patras,213984,2011
Save, and upload on the S3 bucket as well.
Executing and troubleshooting
If everything is set up correctly up until now, we should be able to see a new DAG
inside our Airflow
home page:
On the far right corner we select to Trigger DAG /w config
:
And insert the following configuration and select Trigger
:
{
"input_file": "population.png"
}
After waiting a bit, we will notice that our is failing with the following message:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 209, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airgoodies/task/aws_tasks.py", line 65, in load_from_s3_to_mongo_table
transform_method: Callable[[DataFrame], DataFrame] = getattr(
^^^^^^^^
AttributeError: module 'custom.transform' has no attribute 'keep_athens'
The above message is expected because we have defined a method custom.transform.keep_athens
as
our custom_transform_method
but we have not created it yet.
To remove this message, we need to edit our dags/custom/transform.py
and add the following code:
import pandas as pd
import logging
def keep_athens(data: pd.DataFrame) -> pd.DataFrame:
"""
Function that loads the pandas DataFrame and removes the lines that are not
related to Athens.
"""
logger: logging.Logger = logging.Logger('airflow.task')
logger.info('Running custom transform method: keep_athens')
# Remove each line not related to 'Athens'
data.drop(data[data['city'] != 'Athens'].index, inplace=True)
logger.info(f'New data:\n{data}')
return data # Return the DataFrame to write it in MongoDB
Save, wait a bit and re-execute the example. Now everythign should execute correctly:
To verify the resource, we need to log in to our MongoDB (mongodb://root:root@mongo:27017
) and verify the saved result
of our table my_output_table
in data
schema.
The result should look like this:
Conclusion
This example demonstrates just a short example of the abilities of airgoodies
, and it was created using
the airgoodies
version v0.0.4
Author
Stavros Grigoriou (stav121)