Business Intelligence, ETL, Optimisation

ETL Parallelization: Loading Fact Tables Using the Modulo Shredder

For all the performance optimizations you may put in place, eventually you may run up against a bottleneck – you may want your load time to be reduced, but you find that each subsequent optimization simply will not deliver the performance increase you require.

If you’re in this situation, you may want to consider parallelizing the most time consuming elements of the ETL process. For many datawarehousing applications, this is likely to be loading of one or more fact tables.

Parallelization requires the consideration of additional issues which you may not have had to consider in your ETL process to date. These may include:

  • Deciding how to divide the data to be loaded into parallel load streams
  • Preventing data duplicated in multiple load streams from being loaded more than once
  • Ensuring that all parallel load streams complete
  • Handling the scenario where one or more streams fail to load, in an elegant manner
  • Avoiding contention between parallel load streams

I’ll attempt to address these challenges and suggest possible solutions:

Sometimes, we may be fortunate enough for multiple, identical data sources to be available almost simultaneously and in a number of sources that match the degree of parallelization (DOP) that we wish to achieve. However this is rarely the case.

More commonly data arrives either as fewer large sources than the DOP, or many more. Furthermore, we may wish to change the DOP we use over time – either as a result of studying the performance metrics of our existing DOP and deciding to increase/decrease it, or as a result of changing the hardware resources available.

Yet another challenge is load balancing the data provided to each parallel stream – if we use an algorithm that uses a pure business value to divide the data, we have to ensure that values are evenly distributed such that the DOP we wish to achieve matches the distribution of data values. This is rarely possible to achieve.

Microsoft Balanced Data Distributor

For SQL Server Integration Services, Microsoft now provide a Balanced Data Distributor Transformation. As the documentation indicates, this transformation takes a single stream of input, and outputs it to one or more further transformations or destinations using a round-robin algorithm on the data buffers. This is ideal for quickly implementing parallelization where the subsequent data pipeline is otherwise the bottleneck. What it doesn’t deal with is the scenario where the source pipeline is the bottleneck. You may have noticed that no matter how narrow your tabular flat file is, it’s very difficult to exceed 80,000-100,000 rows per second.

To tackle this we need to parallelize not just the dataflow, but the source itself. But herein lies a conundrum: For optimum performance, we only want to parse a source file once and no more. If we have to parse the data more than once, then we want subsequent parses to be on a reduced, more efficient data format.

One solution that deals with these problems is what I call the “Modulo Shredder”:

Modulo Shredder

The Modulo Shredder is a pattern I’ve devised for processing one or more large files and breaking the data into even-sized files that not only match in number the DOP we wish to achieve, but are much smaller.

The idea is that instead of processing a single large source file using one parse and one dataflow, we divide the processing into two stages:

The first stage parses the large source file and outputs the content of the file into n files (where n is the DOP we wish to achieve). The output files are not in the format of the source file, but in the most optimal and digestable format for your choice of ETL tool. For example, in the case of SQL Server Integration Services, the most optimal output format for subsequent import would be the raw file format. In order to provide some output files that contain approximately an even number of rows, we calculate the modulo of an integer value in one of the columns, choosing any integer column which will give us a reasonably balanced distribution of values based upon the DOP we seek (and are using for the modulo calculation).

The second stage consists of your ETL data flow component, as it was before with a few minor modifications: Firstly it needs to take one of the raw files as the format for the data source. Secondly because the component itself decides what files to work upon (instead of being handed files by the master thread/process), it will need an input parameter defining which of the n files to take. Because we’re using a modulo to divide the files up, we know that the possible values for this parameter will be 0 to (Modulo -1). So if we want to create 8 separate dataflows, we’ll need 8 separate input files, we’ll use a modulo 8 to split the original file up, and each file will contain values where the modulo 8 operator returns values in the range of 0 to 7.

The second stage component is executed n times in parallel following Stage 1, with each parallel execution running with a different value for n in the range of 0 to (Modulo -1). This is why the second stage should be implemented as a separate component – by doing so, we can supply the component with the result of the modulo calculation for that instance as a parameter.

The Modulo Shredder pattern deals with a number of the problems originally identified: It divides the data, it prevents any row being sent down two or more pipelines (as long as you don’t give two or more instances of the second stage component the same modulo result number), and it avoids contention for multiple load streams (providing your second stage code doesn’t implement any exclusive locks on resources shared over multiple instances of the second stage compoment) but how does it deal with ensuring that all parallel load streams complete and the converse situation, ensuring that if a failure occurs, the package fails elegantly and to a known state?

To deal with this last requirement, it’s simply necessary to utilise the transactional capability of your ETL tool to wrap-up the parallel second-stage component executions into a single transaction – if one instance fails, the entire transaction is rolled back. This of course can only be done where the destinations support transactions, otherwise you’d have to implement an all-or-nothing algorithm yourself, but it can be done.

Summing Up

The Modulo Shredder pattern breaks large source files into n smaller, more easily digestible native format files. It permits you to change the degree of parallelization quickly and easily. Because it inherently has no contention in and of itself, its ideal use is to load fact tables in datawarehouses. By implementing it, you will be able to achieve total pipeline throughput rates in the millions of rows per second.


About Ian Posner

Ian Posner is an independent consultant specialising in the design, implementation and troubleshooting of systems that demand the very highest performance and scalability.


No comments yet.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: