Processing Huge CSV Files with AWS: Part II
Processing Huge CSV Files
Last time, we used multipart uploads to efficiently upload large (hundreds of GBs) files to an S3 bucket. Now that the file has arrived (yay), it’s time to process the data. A traditional RDBMS-oriented approach is to throw the results row-by-row into a database.
However, it depends on the context of the data. For example, maybe you want to transform the data such that similar rows are “collapsed” into sums, or maybe you want to enrich the data with calculated columns or data from another data source…there’s a lot you might need to do before you’re ready for final processing.
Doing this by dumping (there’s that lovely word again) everything in SQL is possible, but we’re talking about a lot of data flowing into the system and a lot of overhead (physically and mentally) to structure these schemas, potentially needing to join multiple tables depending on the complexity of the import.
That adds overhead to the whole stack -- either you need a separate bit of hardware to handle the import or you’re burning production resources on long-running processes. You’re also possibly taxing the DB layer, especially if part of the processing pipeline requires SQL queries.
We run into more issues if we need to touch the rows before we smash them into the relational database, as now we’re eating even more CPU time and memory. This is why ETL exists.
ETL pipelines are ideal for extracting, transforming, and re-saving data into another format (like S3, SQS, or a DB directly). There’s more frontloaded time to learn the syntax and process, but it can rescue you from writing a mess of procedural code and SQL and allow you to process data at greater scales.
There’s many options for ETL (like EMR), but for now we are going to focus on AWS Glue. Sticky name, Amazon!
Let’s do Some ETL!
Glue can scan our data, process it, then store it wherever we want, including RDS for further analysis in SQL. The advantage is that we’ll be working in an ETL-oriented environment preloaded with common libraries. We can work in Pyspark (the Python API for Apache Spark, which backs Glue) without any setup.
The first step with Glue is the crawler. Creepy. With Glue, it can be as easy as pointing to an S3 bucket and asking AWS to analyze the files. It will then build a schema as best it can based on the contents, all via their UI.
We’ll use glue to ensure only valid columns are selected and that their format is consistent no matter what the client uploads. Then we’ll pipe the data into a relational database or back into S3. We might later want to enrich or filter this data and combine metadata from another file (that could even be in a different format), which is very easy to do with Glue.
Create a new ETL job -- select either “Spark Script editor” or “Visual”. The UI-based workflow is good for simple ETL jobs that might not require much or any code. For example, let’s say I simply want to load every row of a CSV into a relational database so that I can do more structured operations against the data.
Of course we could write a simple script for this using a naive implementation, but can it scale to 1-2TB worth of data? Glue ensures that the process will scale no matter how big the file, and it can load this data faster because we can control how many workers it will use.
I will be adding some transformations later on, but for now, let’s see how easy it is to simply beam all our data into RDS while simultaneously creating another file in S3 in the columnar apache Parquet format (useful if we wanted to run Athena queries against the raw CSV).
As you can see, the setup is very simple. The source is the S3 data bucket, an “imports” folder in this case. The same place we uploaded CSV files in the previous tutorial:
To set this up, we add two “target” nodes, then set the “parent” to each to the S3 source bucket. Now when we run the pipeline, our data is retrieved from S3 and multiple workers will reformat the data and save it back to S3 and an RDS instance in mySQL with zero work on our end. We know this will work at scale.
This is already very useful…but let’s add an actual transformation. We can do this by adding a new transformation node and resetting each parent node accordingly (so that the “data target” nodes use the new transformation as a parent). This is easy enough to see with an example transformation that detects if the “parent_id” field is blank and substitutes it with the value of “product_id” if so.
As you can imagine, it’s not that difficult to “click together” multiple steps:
What about Scripting?
You can build your pipeline through scripting, specifically in Python or Scala. Here’s the auto-generated script that matches the above workflow to give an idea for how this works:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglueml.transforms import FillMissingValues args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="awi-imports", table_name="imports", transformation_ctx="S3bucket_node1" ) # Script generated for node Fill Missing Values FillMissingValues_node1695661985158 = FillMissingValues.apply( frame=S3bucket_node1, missing_values_column="parentid", output_column="productid", transformation_ctx="FillMissingValues_node1695661985158", ) # Script generated for node MySQL MySQL_node1695661312192 = glueContext.write_dynamic_frame.from_catalog( frame=FillMissingValues_node1695661985158, database="awi-imports", table_name="imports", transformation_ctx="MySQL_node1695661312192", ) # Script generated for node Amazon S3 AmazonS3_node1695661708381 = glueContext.write_dynamic_frame.from_options( frame=FillMissingValues_node1695661985158, connection_type="s3", format="glueparquet", connection_options={"path": "s3://awi-files/results/", "partitionKeys": []}, format_options={"compression": "snappy"}, transformation_ctx="AmazonS3_node1695661708381", ) job.commit()
As you can see, this isn’t that complex. The AWS auto-generated stubs are documented to show their usage. In general, ETL jobs will start by initializing some contexts so you can access Glue or PySpark functions.
In other words, the code pulls data from the DataTable, transforms it (in this case simply filling a value if it is blank), then saves it to one or more targets (S3 and RDS in this case). Finally, it calls "job.commit()"
Scripting these pipelines will obviously give more power and control compared to a UI, but it also depends on learning a bit about PySpark and familiarizing yourself with Glue-specific functions, too.
This is why using a UI can be so helpful -- there’s a mess of PySpark/Glue methods that we don’t yet know. Since we’re just getting started, I strongly suggest leveraging the UI, then peaking at the auto-generated Python. The UI can also give some hints for how some blocks of logic work, helping you make choices about what code you need to write yourself vs. code that already has helper methods from Glue or Spark.
Conclusion
When processing huge CSV files (or any big data file), this step is often the bottleneck. Even if memory is managed well, blocking off a PHP process for long-running imports can be a problem. This is even more true when trying to do some sort of ETL against the data, as you’re just eating more CPU (potentially on production systems) to crunch huge volumes of data.
Glue solves that problem, enabling us to combine data from multiple sources, transform it, then re-save (load) into many different formats.
Now we have all the data in a structured format (a relational DB in this case, but it could just as easily be Dynamo, Redshift, or S3). The final step to our pipeline is actually “doing stuff” with this data. We’ll be using Lambda to handle all our business logic.
We aren’t fully done with ETL yet, though. We’ll be going back to our ETL pipeline as we work on our actual business logic so that this pipeline can do as much of the heavy lifting as possible.