load_from_s3_to_mongo_table
Overview
The load_from_s3_to_mongo_table
task, enables the ability to load a file (currently supported .csv
, .xls
, .xlsx
)
directly from an S3 bucket into a MongoDB table, offering the ability to perform actions in between.
Transform method options
option | values |
---|---|
airgoodies_transform_method | print_table_contents |
custom_transform_method | path.to.method : the method must have the signature Callable[[pandas.DataFrame], pandas.DataFrame] (View examples below) |
output_table_name | the name of the MongoDB collection to save the result into, default is {dag_id}_output_table |
Example YAML syntax
example_task:
airgoodies_task: load_from_s3_to_mongo_table
config:
airgoodies_transform_method: print_table_contents
output_table_name: my_output_table
The above YAML defines a task that will load a file from the configured bucket, print it's contents and export it into a MongoDB table.
Example with custom transform method
Let's say we want to load a file from an S3 Bucket, modify the contents and then save the output to MongoDB.
For this case consider the following file population.csv
city,population;year
Athens,3153000;2021
Athens,3154000;2022
Patras,215922;2021
Patras,213984;2011
And instead of loading all the cities, we need to load only the ones for Athens
.
In this case we need to create our own custom method and link it.
Considering the following structure:
root/
\_ dags/
\_ my_dag.py
\_ custom/
\_ transform.py
We create a new function in transform.py
:
import pandas as pd
import logging
def keep_athens(data: pd.DataFrame) -> pd.DataFrame:
"""
Function that loads the pandas DataFrame and removes the lines that are not
related to Athens.
"""
logger: logging.Logger = logging.Logger('airflow.task')
logger.info('Running custom transform method: keep_athens')
data.drop(data[data['city'] != 'Athens'].index, inplace=True)
logger.info(f'New data:\n{data}')
return data # Return the DataFrame to write it in MongoDB
Now, we need to create a YAML file that creates our task in Airflow
.
The file should look like this:
my_load_data_task:
airgoodies_task: load_from_s3_to_mongo_table
config:
custom_transform_method: custom.transform.keep_athens # Path to our function