“The goal is to turn data into information, and information into insight.” — Carly Fiorina, Former executive, president, and chair of Hewlett-Packard Co
In this day and age, data modeling is no longer a prerequisite to collecting data or extracting value from it. Raw or unstructured data has become extremely easy to store and everyone can start collecting data from various sources and at large scale without knowing what value it has. That said, and as the quote suggests, to make the most out of your data, you must define, model and structure (at least some of) it.
Working on raw data is a task only designed for highly skilled data engineers or data analysts. That means a lot of such employees will need to be hired and on top of that, every new business question will consume a lot of those employee’s time to define, investigate and answer.
At SeaData, we provide the products and the solutions to meet those needs. Say you have a bulk of files coming into an AWS S3 bucket. How would you parse those files? How would you keep track of the files you already worked on? What will be a failsafe in case an error occurred? How and where will you structure those files to reduce costs and enable querying on them?
That was exactly the case we had to work on for one of our clients. Our solution was to create an automatic workflow to parse the raw files, transform them into structured data and insert it to a DB that enables querying it and gaining insights from the results. The best thing about it was that it all happened in house, using only AWS services to improve speed and security, and without creating any EC2 instances or clusters that the client needs to support or maintain.
What is AWS?
Our solution was built on AWS services such as:
By relying solely on AWS, we were able to improve speed, stability, and security, while almost completely reducing the need for maintain and support. That is because AWS provide those on its own!
The concepts of S3, Redshift and Lambda are pretty much well known, and if not, a short read will give you all you need to know. That is not the case with MWAA.
What is MWAA
MWAA stands for Managed Workflows for Apache Airflow. What that means is that it provides Apache Airflow as a managed service, hosted internally on Amazon’s machines. That brings us to the question what is Apache Airflow?
For our use case, we used Airflow to design ETL pipelines and trigger them at specific schedules and order. Each of those pipelines is addressed as a DAG (Directed Acyclic Graph) inside Airflow.
By using MWAA instead of installing Airflow on a server, we were able to improve security and reduce the costs of maintaining and supporting it to a minimum, while still gain the flexibility we needed for our code to run.
Our client had different sort of files landing in his S3 buckets. Some were zipped while others weren’t, some were in CSV format while others in Json format etc.
The design we suggested is simple yet powerful:
- A file is created inside a S3 bucket.
- Lambda function is triggered to collect the file’s metadata and log it into a Redshift table.
- The file’s DAG is triggered to run an ETL pipeline with these tasks:
- Check if new files exist.
- Filter for newest file.
- Mark file as `In Progress`.
- Create corresponding table in Redshift if not exist.
- Perform logic to move data from file to Redshift table.
- Log move information such as number of corrupted rows, number of successful insertions etc.
- Move file to Archive.
- Mark file as `Done`.
Setting the Infrastructure
Before getting into the heavy part of creating the DAGs, let’s examine what needs to be done to support such a workflow.
We needed to setup S3 and Redshift clusters of course but those are done very easily by following the AWS UI. We also needed to create the Lambda function and add triggers to it and build the MWAA environment.
To allow all the different services to work together in the most secure and cost-effective way, it’s important to note a few things:
- All services should be in the same VPC. (Virtual Private Cloud)
- Each of the main services (apart from S3) need to have a Security Group.
- All IAM Roles should be given correct permissions.
By setting all our services in the same VPC, we can get them to communicate with each other (via their private IP address) without any internet access which make our system extremely secure and allow us to avoid paying for a NAT Gateway!
When creating the Security Groups, we need to add inbound rules between our services in such a way that will for example allow the Lambda Security Group to communicate with the Redshift security group and so on.
In addition to creating each of the services, we should also generate an IAM Role specifically for that service needs. If for example we would like to use the Secrets Manager to store important password for Redshift or what not, we should add permissions to use it as well.
Designing the Lambda Function
As soon as a new file lands inside a S3 bucket, we need a process to log that a new file was inserted, along with other useful information.
The best way to do so is to create a Lambda function and trigger it on every new file creation. Since we wanted to store each file’s metadata in a Redshift table, we needed to make sure the Lambda function can connect to Redshift internally (remember VPC and Security Groups?), and to avoid hard coding the password, we used the Secretes Manager to obtain it.
On AWS, you can code your Lambda in multiple languages (Java, Node, Python, Ruby and .NET Core) and it may run up to 15 minutes and handle up to 500MB of RAM (True for the time these rows were written). That is why you want it to be quick and simple and avoid performing any heavy lifting.
Our Lambda was straight forward:
- Store full path + name of the newly created file the Lambda was triggered on.
- Validate the file (i.e., it was inserted to the right folder, in proper format, etc.)
- Store any additional metadata information such as file size, timestamp and so on.
- Set file status to `New`.
- Retrieve Redshift password using the Secrets Manager and set up a connection.
- Run an insert query to insert a new row with all metadata into the Redshift table.
By using said Lambda and enable a trigger on every new file created in the needed buckets, we were guaranteed that every new file will go over our process exactly once. That assured us that we won’t miss any file and we won’t get any duplications either.
It’s good to notice that the Lambda doesn’t care on what file it’s triggered on, so one Lambda can be used on all files! Just remember to add triggers on all needed buckets.
Developing the DAGs
So far so good, we have a table in Redshift, let’s call it mng, that’s holding all the data of our newly created files across our S3 buckets. Now comes the harder part. We need to define each file’s data, select the newest file from each type, process it no matter in what format, manipulate or apply any sort of logic on it and insert it to its own Redshift table. The goal is for this process to be done automatically of course in a specified schedule, and in as much parallel as possible.
The first part will probably take the most amount of time. It’s when we need those highly skilled data engineers and data analysts to help in modeling the file and create a proper scheme for it. However, it’s done only once for each new file “type”, so all other new files from the same source will be handled automatically.
After we have a proper scheme designed, we can start developing the tasks for our DAGs.
Extraction and Verification
Since we want the procedure to run automatically, we need a method to verify that a new file exists in the mng table, and if it doesn’t, abort. Plus, our client stated that in some scenarios, multiple files could be inserted at once and we need to use the most recent one.
To meet those needs, we used a special Operator in Airflow called Sensor. A sensor checks for a validity of the specified criteria every x seconds. If the criteria were met it moves on with the flow. If the criteria weren’t met until y seconds have passed, the flow aborts (until triggered again automatically by the specified schedule.)
Because our criteria were a query over Redshift mng table, we used a SqlSensor that can connect to Redshift and run any query we like. In addition, we added a way for capturing the answer of said query and communicate it to other tasks using Airflow’s XCom (you can read more about XCom here).
Our Sensor queried for any files of a specific “type” with status `New` and returned all of the relevant metadata on them, so we can later filter based on timestamp or UUID to retrieve the recent file and set all others to `Filtered Out` status. The chosen file was set late on to status `In Progress`.
Manipulation and Loading
Now we have a single, most recent file, marked as `In Progress`. Remember those highly skilled data engineers and data analysts that defined the proper scheme for such a file? Now we can use this scheme to create a table in Redshift that will store the file’s data. Depending on the client, we can make sure the table exist, Append the Data ,create a new table, truncate an existing one, and so on.
To perform the data migration, we can do all sort of things, depending on the client’s needs. In our case, a lot of the tables were mirror tables, i.e., an exact copy of the files’ data. That allowed us to utilize one of Redshift most powerful tools – Redshift Copy Command.
The Copy Command loads data into Redshift from all sorts of sources inside AWS including S3. The reason it’s so powerful is that the load process is distributed among AWS resources automatically. On top of that, by providing simple parameters, it can handle almost any sort of file and any sort of data within it. The file may be Encrypted, zipped, in Json or as CSV. It can contain blanks, different date types, uncommon encoding, empty or missing data, and still, by providing the correct option parameters, the Copy Command will handle it as fast as possible!
But how will you know if everything worked, and no data was lost? And if there were errors, how can you investigate them? Plus, what if your needs don’t require a mirror table but applying some logic to the data and then setting it in a staging table?
No need to worry. Airflow is so flexible that all of this can be done. Instead of using one of Airflow’s existing Operators to perform the Copy Command, we can simply write our own! That way, after the Copy Command finished, we can gather information on how many rows were inserted and how many errors we had in the file’s data with exact information on where in the file and what error it was.
In case some logic needs to be applied to the data prior to loading it to Redshift, we can simply add another task to the DAG that will perform some logic. Depending on the logic, we will test and choose the right procedure to use. Either loading to a temporary table and generating SQL queries to perform it or use Python to read and parse the file while performing any sort of manipulation in the process.
What we did is a combination of both. For the client’s mirror tables we used the Copy Command with specific option parameters for each file and logged the number of copied rows as well as any errors that occur in our mng table. For the files that required some manipulation prior to loading them to staging tables, we created tasks to perform that logic and then continue with the load either via Copy Command or via Python libraries.
Cleaning Up and General Error handling
After the entire procedure is completed, and the structured data now lives in its own Redshift table, it’s time to wrap up.
To avoid human errors, we decided that each file will be moved to an Archive directory that the Lambda won’t get triggered on and that its status will be changed to `Complete` in the mng table.
However, what happens if there was an error in one of the tasks? Won’t the file get stuck in between?
No. We added a failsafe that whenever a task fails, it sets the status of the file to `Fail`. In addition, it sends an Email notification and adds a message on Slack to the appropriate people that will investigate what happened. That was the desired approach in our case, but Airflow has so much flexibility that almost everything is possible. From setting up timed retries or triggering a cleaning DAG and up to interact with any 3rd party tool out there, if you can do it with Python, you can add it to the pipeline!
We started with a bunch of files getting into our S3 buckets. It was an unused Data Lake with great potential. The files contained raw unstructured data in multiple formats. After the workflow is completed, we had all the data structured in Redshift tables ready to be queried. The best part is that this entire workflow will run automatically whenever you want it to, and as fast as possible, depending on the amount of data in the files.
Now that you have an operational Data Lake you can add additional pipelines to work on the structured data and gain insights from it. The scheme is known, and the data is defined, so every analyst, developer, or data scientist can work on gaining as much information as possible. Information that will be translated into insights.