Nicolas Monchy

Can you move your data pipelines to a serverless architecture? Should you? At GumGum we just built such a pipeline at scale using AWS. Here are our tips and our feedback on the limitations of such an architecture.

Serverless data pipelines?

We’re not going to talk about pipelines that use Amazon Lambda. Serverless here means fully managed services when the servers are abstracted away from the user. Think about the services such as S3, IAM or DynamoDB - it is not your responsibility to deal with the AWS instances behind them.

So here our focus is on the ETL data pipelines that use S3, Glue, Athena and Redshift Spectrum.

In the industry, it's common to differentiate batch from streaming data pipelines. At GumGum, we utilize both and they work great.

 

So the question arises - why would we also need serverless data pipelines? 

Here are the caveats with our classic pipelines.

  1. We need to update our processing code and database tables to expose new fields which is tedious and tough to scale.
  2. We do not store the entire data on the database as this would impact our storage cost. When we need ad hoc reporting on the data that is not stored in the database, we write custom Spark jobs that read from the raw data files. That’s time consuming and not scalable.

What if we could have serverless data pipelines that we don’t need to keep updating, and that would let us run SQL queries on all our data stored on S3?
The first serverless data pipeline we set up at GumGum receives an average of 20 TB of compressed files a day.

Which services to use?

AWS S3

To facilitate this, we use Amazon S3 as the storage rather than a database such as Redshift. That’s the main difference with our classic data pipelines. The good news is that if you use AWS, it is very likely that your data is already on S3.

AWS Glue

Glue is the central piece of this architecture. Amazon brands it as a “fully managed ETL service” but we are only interested in the “Data Catalog” part here using the below features of Glue:

  1. Glue as a catalog for the tables - think as an extended Hive metastore but you don’t have to manage it.
  2. Glue crawlers which play a key role in deciphering the schema. You set up a crawler to run periodically against files in a specific S3 path. Based on the content of the files, the crawler creates the table, or updates it when new files have an updated schema, for example, with a new column.

This service is very powerful because it lets us design a pipeline in a generic way that is not specific to a schema.

Amazon Athena & Amazon Redshift Spectrum

Both Athena and Spectrum are services to run SQL queries on files stored on S3. Both are integrated with the Glue catalog so you can query the tables created by your Glue crawlers right away.

You can use either, or both. Spectrum requires an existing Redshift cluster so it’s more relevant if you are already using Redshift. Otherwise Athena can be used.

Behind the scenes, they use different engines developed by different AWS teams. Result is that they don’t behave exactly the same but we’ll come back to that.

The serverless architecture

Using these 3 or 4 services, we get this simple architecture.

In a nutshell, your data is stored as files on S3. Glue crawlers scan these files and update the Glue Catalog. Athena and Spectrum let you query these files using the schema defined in the Glue catalog.

[Data format] Apache Parquet and Apache ORC

Apache Parquet and Apache ORC are two different column oriented data formats. Most of your reporting queries don't need all the columns of the table but only a few ones. Using a column oriented formats is a big win when using Athena and Spectrum because your queries are going to be way faster, and more affordable as you pay per GB scanned.

One way to convert your data to Parquet files is through Spark job. If you have structure data in input, such as Avro files as we do, then you can write a generic Spark job that doesn’t need update when your data schema changes.

[S3] Data partitioning

Partitioning your data is the single most important thing to do for performance and cost. And all you have to do is to define partition keys and values in your S3 keys, and, of course, filter on the right partitions at query time.

Let’s say we want to partition our data on a 1hour granularity. Historically at GumGum we used to have S3 keys not compatible with Glue partitioning format: 

s3://my_bucket/logs/2019/03/27/16/

The Glue documentation recommends something like:

s3://my_bucket/logs/year=2019/month=03/day=27/hour=16/

In our opinion using 4 partitions keys is too much effort at query time. Think about your end users that are going to query these tables!

So we are now using only one partition key:

s3://my_bucket/logs/datetime=2019-03-27-16/

The good thing is you can apply range filters in SQL, for example,

SELECT id
FROM my_table
WHERE datetime
BETWEEN '2019-03-27-16'
AND '2019-03-28-02'

 

[Data Model] Schema complexity and schema evolution

As careful as you are when you design it, your data model is going to change over time as your business does.

So what kind of schema evolution is supported by this architecture? Can we add a column? Delete a column? Alter another one?

The answer is not simple. First of all, it is important to understand that your schema is defined at 3 different levels:

  1. Your Parquet/ORC files hold a schema
  2. Your table defined in Glue has a schema
  3. Every partition of your table defined in Glue has a schema

And all these schemas need to be compatible so that Athena or Spectrum can behave consistently at query time.

One big differentiator here is whether you have a simple and flat schema or a schema with complex types and nested structures.

Real world data is often nested but this makes schema evolution harder. Let's say, you want to add a sub-field in a nested struct. Then it's not just about adding a column but altering an existing one which is not as well supported but we’ll come back to this.

[Glue] How to tune your crawlers

Remember how a table and each and every partition has a schema. You can use the `inheritFromTable` Glue option so that anytime the crawler runs, if the table schema is updated, then the schema of all the partitions also gets updated with the same schema. This option is referred to as “Update all new and existing partitions with metadata from the table” in the console.

This guarantees that the table and partition schemas match. Then all you have to care about is that it’s compatible with the file schemas. According to our tests, using that option greatly reduces the number of errors at query time.

As your dataset grows, the crawler runs last longer and longer. For our data pipeline with 20TB of daily new data, after 5 months, one run was lasting about 3 hours. So your queryable data is getting less and less fresh. While the Glue team works one improving that, we are using the “exclude trick”.

A Glue crawler lets you exclude S3 keys pattern. So we are using it to exclude paths that have already been scanned as we know that no new files will be added. For example, to exclude all files from January to April 2019, you can use this exclude pattern:

datetime=2019-0[1-4]-**

 

At the next run, the crawler will skip all S3 keys starting with this pattern. This let us cut down the crawler run times from hours to  about 10 minutes.

[Limitations] Athena and Redshift Spectrum inconsistencies

As mentioned before, Athena and Spectrum are different engines developed by two different AWS teams. The downside is that they behave differently at times in unexpected ways.

We won’t list an exhaustive list of differences, but only a few ones that we met. However you should remember that you don’t have any guarantee that Athena’s feature X is supported by Spectrum and vice versa.

What if you want to query Avro files with a nested schema? You can do it from Athena but not from Spectrum. What if you want to use a `SELECT *` query statement on a table having a nested schema? Athena does support it but Spectrum doesn’t.

The good news here is that they differ but they tend to converge. Athena didn't have views in the past, but now it does. And we couldn’t query structs, maps and arrays fields from Spectrum but it is now supported.

[Limitations] At scale, files conversion to Parquet/ORC can be expensive

The first serverless pipeline we built scans about 20 TB of data a day. Using Spark to convert those files into Parquet files works great but ends up costing a lot of money at scale (we estimated about $6k a month in our case).

Ingestion of all these files, and writing the output Parquet files from EMR to S3 are time consuming tasks. Especially the latter because S3 is not a filesystem and doesn’t have an efficient rename operation. So when temporary files are moved, they are actually copied and deleted which takes a lot of time at scale.

Note: this was before EMR released a proprietary EMRFS S3-optimized Committer for Parquet files. Using this comitter speeds up a lot the last step as it is avoiding the rename operations.

In the meantime, for this specific pipeline, we simply paused the Spark job and directly queried the raw Avro files. This transfers the cost from pre-processing for a better storage format to query time cost.

[Limitations] Schema evolution support

If you limit your schema evolution to adding new columns at the end of simple/flat schemas, you are safe. This means you can query both the old schema files and the new schema files from the same table from Athena and Spectrum.

For complex/nested schemas, it’s more complex. Adding sub-fields in Avro files is supported when queried from Athena only and not Spectrum, whereas in Parquet/ORC files is supported when queried from Redshift Spectrum only and not Athena. This adds complexity (what service should I use for what use case?) so hopefully the schema evolution support will get better with time.

Also, keep in mind that you take advantage of your Spark jobs that convert your data into Parquet files to explode and flatten your schemas at the same time.

[Limitations] The cloud capacity is (not) infinite

For Athena, you can contact the AWS support to increase your max number of concurrent queries, or how long a query can run before it times out. That's useful if you have peaks of activity or long running ad-hoc queries. And because it’s a managed service, scaling is not your responsibility.

However that doesn’t mean that the capacity AWS is willing to dedicate to you is infinite. We had cases when queries were accepted but wouldn’t actually start before the previous ones finished. We also had error messages stating that Athena was out of capacity.

[Limitations] The black box effect of managed services

Fully managed services act as a black box - you can only see what the provider lets you. It’s great because you have less responsibility, but sometimes you would like to have more insights.

When one of your Spark jobs fails, you check the logs, investigate and try to fix or workaround the issue. But when one of your Athena queries has an internal error, all you can do is contact the support. And if you hit a Glue crawler bug, all you can do is to open a ticket and wait for it to be fixed. AWS support proved to be responsive and efficient but you won’t get any ETA on bug resolutions or feature requests.

The good bits & conclusion

Most of this article is about the limitations of this architecture but it’s important to emphasize that it works fine! We now have several of such serverless pipelines in production, they are affordable and very low maintenance. 

Most importantly, they meet our 2 primary goals:

  1. no update required when the upstream schemas are updated with new columns
  2. ability to run SQL queries on our whole dataset

As discussed, Glue, Athena and Spectrum still have a few edge cases and limitations but these serverless pipelines are a great way to make your data more accessible to anyone that knows SQL or can use a BI tool.

If you’d rather watch a video than read the article, here is the talk's version, given at the London AWS User Group UK meetup.

Guides