load_from_s3_to_mongo_table

Overview

The load_from_s3_to_mongo_table task, enables the ability to load a file (currently supported .csv, .xls, .xlsx) directly from an S3 bucket into a MongoDB table, offering the ability to perform actions in between.

Transform method options

optionvalues
airgoodies_transform_methodprint_table_contents
custom_transform_methodpath.to.method: the method must have the signature Callable[[pandas.DataFrame], pandas.DataFrame] (View examples below)
output_table_namethe name of the MongoDB collection to save the result into, default is {dag_id}_output_table

Example YAML syntax

example_task:
  airgoodies_task: load_from_s3_to_mongo_table
  config:
    airgoodies_transform_method: print_table_contents
    output_table_name: my_output_table

The above YAML defines a task that will load a file from the configured bucket, print it's contents and export it into a MongoDB table.

Example with custom transform method

Let's say we want to load a file from an S3 Bucket, modify the contents and then save the output to MongoDB.

For this case consider the following file population.csv

city,population;year
Athens,3153000;2021
Athens,3154000;2022
Patras,215922;2021
Patras,213984;2011

And instead of loading all the cities, we need to load only the ones for Athens.

In this case we need to create our own custom method and link it.

Considering the following structure:

root/
    \_ dags/
        \_ my_dag.py
            \_ custom/
                \_ transform.py

We create a new function in transform.py:

import pandas as pd
import logging


def keep_athens(data: pd.DataFrame) -> pd.DataFrame:
    """
    Function that loads the pandas DataFrame and removes the lines that are not
    related to Athens.
    """
    logger: logging.Logger = logging.Logger('airflow.task')
    logger.info('Running custom transform method: keep_athens')

    data.drop(data[data['city'] != 'Athens'].index, inplace=True)

    logger.info(f'New data:\n{data}')
    return data  # Return the DataFrame to write it in MongoDB

Now, we need to create a YAML file that creates our task in Airflow. The file should look like this:

my_load_data_task:
  airgoodies_task: load_from_s3_to_mongo_table
  config:
    custom_transform_method: custom.transform.keep_athens # Path to our function