Batch Identification for Horizontally Distributed Data

Architecture Immutability AWS S3 Analytics

Abstract

The purpose of this document is to provide a recommended approach of leveraging an additional “folder” in the Simple Storage Service (s3) storage layer[1].  This approach provides multiple value adds, the three biggest being non disruptive logical updates, rerun-ability and batch timestamp visibility, all while minimizing the impact to consumers of the data.  

Disclaimers

  • AWS s3 has a flat structure, but for the sake of discussion and ease of understanding, this document speaks to s3 in hierarchical terms, like a traditional file system
  • This document does not intend to speak to partitioning approaches, sub-partitioning or general best practices or usage patterns for s3.  Any discussion herein regarding those items are for the purposes of illustration and explanation
  • This approach works in HDFS just as it does in s3.  However, s3 offers inexpensive “infinite” storage, which is why the recommendation here is specifically for s3.  Implementing this approach in HDFS requires additional consideration (specifically regular data deletion because space is point-in-time finite)

Approach

We will consider this proposal under the assumption that data will be stored in s3, organized by table name, and partitioned (primarily) by a representative date.  There may be exceptions to this approach, and in certain situations sub-partitioning may be leveraged, but for the purposes of illustration within this document, we can assume these things will hold true.  An example for the fictitious customer table for customers who signed up on December 10, 2015 is provided below:

s3n://abc/def/customer.gz/dt=20151210/

Where a series of files (based on the number of reducers of the job that created it, in the Map Reduce paradigm) would exist.  For the purposes of illustration, we’ll assume there are always five files per day, stored at locations like the one shown above and that simple .gzip compression is being used.  Thus, the following would be a complete representation of two day’s worth of customer files:

s3n://abc/def/customer.gz/dt=20151209/part-r-00000.gz

s3n://abc/def/customer.gz/dt=20151209/part-r-00001.gz

s3n://abc/def/customer.gz/dt=20151209/part-r-00002.gz

s3n://abc/def/customer.gz/dt=20151209/part-r-00003.gz

s3n://abc/def/customer.gz/dt=20151209/part-r-00004.gz

s3n://abc/def/customer.gz/dt=20151210/part-r-00000.gz

s3n://abc/def/customer.gz/dt=20151210/part-r-00001.gz

s3n://abc/def/customer.gz/dt=20151210/part-r-00002.gz

s3n://abc/def/customer.gz/dt=20151210/part-r-00003.gz

s3n://abc/def/customer.gz/dt=20151210/part-r-00004.gz

This is a very common and straightforward approach to data distribution, data storage and partitioning within HDFS and/or s3.  The downfall of this approach begins with rerun-ability (which includes processing for “logical updates”).  Imagine a scenario where the 20151210 data needs to be rerun.  Some of the steps to perform that action may be:

  • Identify which data needs to be deleted/restated
  • Alert users that they cannot access the data that will be deleted/restated
  • Adjust Hive to no longer expect partitions in the deleted directories
  • Delete aforementioned data
  • Rerun ETL job
  • Add new location and hive partition information to hive metastore
  • Alert users that they can again consume data

The point here isn’t whether each one of these steps is inherently necessary for each rerun (it’s fair to say that certain items could be automated).  Instead, the point is to illustrate that a number of steps are necessary, and some of those steps are likely manual, which means either things will be forgotten or the act itself will inevitably take longer than is necessary.  Inescapably, users who are leveraging that data will be negatively impacted.

Prior to proceeding, a quick note around batch timestamp visibility: in traditional RDBMS systems, it has long been a best practice to include a timestamp “footprint” column[2] (or multiple columns), which represent the last time a row was inserted or updated.  There are many, many benefits to this approach, and anyone who has spent any significant amount of time in the data warehousing space can likely recite multiple experiences when either footprint fields saved them or could have, had they been implemented correctly.

At it’s lowest level, batch_id is a timestamp.  It is the epoch timestamp of when the process that wrote the data started.  Our earlier data structure example from December 10, 2015, this time with batch_id applied, is shown below:

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00000.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00001.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00002.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00003.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00004.gz

This has no negative impact to users, who continue to access the customer table and its associated partitions (in this case, the 20151210 date) exactly the same way.  The benefits, though, can quickly be seen if we need to rerun the 20151210 customer data, after which, our data looks like:

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00000.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00001.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00002.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00003.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608/part-r-00004.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id= 1449847129/part-r-00000.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id= 1449847129/part-r-00001.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id= 1449847129/part-r-00002.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id= 1449847129/part-r-00003.gz

s3n://abc/def/customer.gz/dt=20151210/batch_id= 1449847129/part-r-00004.gz

The key now is that the only thing that needs to be executed is an alter table state, as shown below:

ALTER TABLE customer PARTITION (dt=20151210) SET LOCATION 's3n://abc/def/customer.gz/dt=20151210/batch_id= 1449847129/';

If we go back to the original steps for rerunning, it now looks like this:

  • Identify which data needs to be deleted/restated
  • Alert users that they cannot access the data that will be deleted/restated
  • Adjust Hive to no longer expect partitions in the deleted directories
  • Delete aforementioned data
  • Rerun ETL job
  • Add Alter new location and hive partition information in hive metastore
  • Alert users that they can again consume data

Not only is the process incredibly simplified, but what remains is extremely automatable.  

Arguably most important, the users of the data are not impacted.  One could argue that the users should be alerted if a rerun is necessary, but more realistically, this isn’t necessarily only about reruns (i.e. “logical updates”), and even if it was, many users may not care about what caused the rerun.   For example, perhaps the gender column was accidently not populated with the initial execution of the customer data, in the ongoing example here.  For users that don’t need that column, asking them to wait to perform their work so that a column they don’t care about is fixed is a difficult ask.  With this approach, they can continue to work with zero stoppage while the issue is resolved.

More to the point, though, are logical updates.  Let’s imagine that the customer data contains addresses (a data modeling mistake in a fully normalized RDBMS system, but arguably a reasonable approach for a denormalized s3 data set).  For each address change, the data may need to be restated (likely not a great design, but it allows us to continue with the same illustration example).  It would be extremely unreasonable to ask the users to avoid certain data-in this case, data that spans multiple partitions-every day so that it could be restated[3].  Under the original approach, that’s precisely what would have to happen.  Under the batch_id approach, users are completely shielded from any disruption.

What the users do need to be aware of is exactly what they needed to be aware of in a traditional RDBMS scenario that involves updates-you may execute a query at 2:45 PM and get a different result than a query you run at 2:46 PM on the same day (because the underlying data may have been updated within that minute).  

Finally, one additional benefit to the batch_id approach is that old data is still accessible, if for some reason it is needed (for example, troubleshooting efforts that include identifying what changed between executions of a given table).  In that scenario, it does require some manual intervention (physically pointing your code at the batch_id that is not the current partition), but the very essence of that type of request is manual (or it’s a poor design) so some minimal and simple manual intervention is a very reasonable requirement.

Data Deletion and Costs

Detractors of this approach are quick to point out that this will lead to large amounts of “dead” data in the system, which carries with it a cost.  There are two points of discussion to combat that argument:

  1. Who Cares?
  1. s3 storage is extremely cheap (https://aws.amazon.com/s3/pricing/), especially as your overall storage and s3 data footprint increases.  If a terabyte of compressed data costs $20/month, many organizations will find that the cost of performing the deletes is actually more expensive than simply leaving it there
  1. Decoupled cleanup
  1. A script can be written to run periodically (daily, weekly, monthly) that accesses the hive metastore and identifies partition locations that are older than some arbitrary amount of time (say, a week, perhaps) that aren’t accessible (tied to a hive partition).  In the previous data illustration, an example would be the original s3n://abc/def/customer.gz/dt=20151210/batch_id=1449784608 data objects
  2. If a decoupled cleanup is desirable, make sure there is a way for data objects to “opt out”, which is to say that it may be desirable to have the “before” snapshot of the data available for some reason.  Simply being able to protect data from the decoupled automated cleanup process described above is a recommended practice, no matter how trivially it is implemented

Why the Epoch?

The decision to leverage the epoch timestamp for the batch_id is important for two simple reasons-first, it provides us the functionality of a traditional ins_upd_ts (insert/update timestamp) in the RDBMS sense, so it provides many of the same benefits.

Secondly, it guarantees that each batch_id will be larger than the previous one.  If, for example, the alter table command fails for some reason, an engineer can quickly identify which directory is the latest, and correcting the issue is straightforward.

Occasionally, groups may decide to leverage something other than the epoch timestamp-most frequently, that is another datetime.  Doing so would change the batch_id from:

batch_id=1449788020

to:

batch_id=201512102253

While the second (which is in a YYYYMMDDHH24MI format) is more human readable, the benefits beyond that level are disputable.  If, for instance, the batch_id is being used programmatically (which wouldn’t ever be recommended, but is possible), the argument for the latter over the former is moot.  If it is for human consumption and readability, it’s reasonable for the user to paste the batch_id value into http://www.epochconverter.com/ or similar to retrieve a formatted date.  It is the experience of the author that adding a second date in a common format can lead to confusion and often results in individuals who aren’t intimately familiar with the batch_id concept to attempt to leverage the datetime batch_id, which again, is not the intent of it and can lead to undesired results (for example, someone physically pointing at an s3 location that is no longer being leveraged within the hive metastore).  Leaving it as epoch provides the right level of (or lack of) clarity to help protect against incorrect usage of the s3 data.

The point, then, is that while it is entirely possible to leverage something other than the epoch timestamp and get the other benefits described within this document, it is not recommended to do so.  


Appendix I

Parameterized Example of Alter Table Command

ALTER TABLE ${hiveTable} PARTITION (${partitionID}=${partitionID_value}) SET LOCATION '${s3HivePath}/${hiveTable}/${partitionID}=${partitionID_value}/batchid=${batchid}/';


[1] In Amazon S3, buckets and objects are the primary resources, where objects are stored in buckets. Amazon S3 has a flat structure with no hierarchy like you would see in a typical file system. However, for the sake of organizational simplicity, the Amazon S3 console supports the folder concept as a means of grouping objects (http://docs.aws.amazon.com/AmazonS3/latest/UG/FolderOperations.html)

[2] The only reasonable argument against including insert/update timestamps was the additional space being consumed by adding 8+ bytes to each row in a RDBMS.  This was often disregarded because of the benefits the approach provided, but this alternative opinion is included for completeness to the overall discussion

[3] One may argue that the logical updates would happen as part of an overnight ETL, but that short sighted approach means that said overnight batch can never move to mini batch (e.g. hourly) and is further shortsighted in that any delays that cause the overnight to slip into business hours would inevitably cause disruption for the users

Categories: Immutability, Architecture, ACID Tags: #updates, #s3, #aws