Complex Data Processing Gets Simpler With Cascading

by Dmitry Orekhov, Software Engineering Team Leader, EPAM Belarus

Big data processing has made incredible strides over the past years, and it would be hard to overstate the role of the MapReduce programming model in this progress. However, MapReduce, while powerful, is still almost universally regarded as a complicated and difficult framework to use, even for professional software engineers. Indeed, developing useful applications on Hadoop with pure MapReduce isn't a trivial task. The Cascading framework based on Hadoop MapReduce may simplify this process significantly.

What’s Wrong With MapReduce?

Paradigm

MapReduce requires a programmer to think in terms of "map" and "reduce," an unintuitive programming model. It is much easier to develop complex applications if you work with a model that more easily maps to your problem domain.

Verbosity and development time

Programming a simple MapReduce job with basic Java requires writing many lines of boilerplate code (e.g. a trivial Word Count requires about 50 lines). There is plenty of Hadoop intrusiveness (Context, Writables, Exceptions, etc.) and a low-level glue code. Rich data types and common operations (such as joins, projections and filters) are tedious to implement and, predictably, require a custom code plus a considerable amount of development time, code review and testing.

Optimization and performance

Imagine a complex data flow that results in dozens of MapReduce jobs that perform joining, filtering and aggregating differently. Because of Hadoop’s distributed nature and the complexity of the MapReduce paradigm, it is hard to decide how to optimize the execution of such a flow to achieve optimal performance. This task requires a huge amount of effort along with an in-depth understanding of Hadoop internals.

In addition, complex multi-staged jobs can be difficult to manage and maintain, and there may be some version mismatches during deployment. Moreover, there are some problems with debugging, testability and IDE support.


You Can Do It Simpler With…

...Pig and Hive

Because of all these issues, there are several application frameworks that try to ease the complexity of writing MapReduce jobs by using high-level abstractions over MapReduce. They usually provide a set of tools that users may be more familiar with, for example, Hive provides an SQL-like language to operate on your data. It was designed to appeal to a community already familiar with SQL and relational databases. Also, Pig provides a procedural scripting language (PigLatin) for expressing data flows, and a runtime environment where PigLatin scripts are executed.

...Cascading

Cascading is a thin Java library for defining complex data flows on top of Hadoop and API compatible distributions. It provides rich query API, a query planner, a job scheduler and abstracts away from much of the complexity of Hadoop. Applications developed with Cascading are compiled and packaged into standard Hadoop-compatible JAR files that are similar to other native Hadoop applications. Cascading lets the developer quickly assemble complex distributed data-processing applications and efficiently schedule them based on their dependencies without having to "think" in MapReduce. Cascading operates on top of MRv1 and MRv2 (YARN).

Cascading uses a tuple-centric data model (just like Pig and Hive) that works best when your input data can be represented using a named collection of scalar values, much like the rows of a database table. It allows you to think about your data processing workflow in terms of operations on fields, without having to worry about how to transpose this view of the world onto the key/value model of the MapReduce paradigm.

Cascading and Hadoop

Cascading, Hive, and Pig were developed in parallel and sometimes perform the same actions. There are a lot of questions regarding how one framework differs from the others, as well as where and which framework to use. However, a detailed point-by-point comparison of Hadoop frameworks goes beyond the scope of this article. Instead, let’s take a closer look at why we have chosen Cascading.

Hive and Pig were built to make MapReduce accessible to data analysts with limited experience in programming. Both are really great tools for ad-hoc data analysis and quick exploration of data. Both enable your data to easily flow in parallel with simple commands and provide the ability to easily manipulate your data at scale. At the same time, Hive and Pig have some shortcomings. First, neither HiveQL nor PigLatin are Turing complete; they do not allow users to control the flow and modularity features that are present in general purpose programming languages, including functions, modules, loops, and branches. Because they lacks code separation and sharing functionality for complex flows, you need to embed them in the external procedural code. Second, in Hive or Pig anything complicated still requires User Defined Functions in Java. UDFs, while usually not too complex per se, still require a separate code base and, therefore, you need to maintain two separate languages. In addition, it would be nice to have some type checking to find errors at compile time rather than job submission time (or even 3 hours after job submission time).

Guided by these considerations when choosing a MapReduce framework, we found that Cascading suits us most. As a Java API, Cascading is primarily suitable for developers and allows you to build rich Data Analytics and Data Management applications, reusable frameworks, libraries, as well as write unit tests in any JVM-based language. For example, if you consider Java too verbose, you can use Scalding (a Scala library built on top of Cascading) to write even more concise and clearer code. Applications developed with Cascading are compiled and packaged into standard Hadoop-compatible JAR files that you bundle with your job. Any additional operation can be implemented as a straight Java function.
 

Cascading Framework

Imagine a Stream of Fluent Data vs. Key/Value pairs

The Cascading processing model is based on a metaphor of pipes (data streams), plumbing (pipe assemblies) and filters (data operations). Pipes are created independently from the data they will process. A simple chain of pipes without forks or merges is called a branch; an interconnected set of pipe branches is called a pipe assembly. Each assembly of pipes has a head and a tail – a source tap (input data) and a sink tap (output data), respectively. Taps are bound to pipes to create a flow. Any unconnected pipes and taps will cause the planner to throw exceptions. Cascading represents all data as “Tuples.” A tuple is a record of data, and a pipe represents a 'tuple stream' through which tuples flow, so that an operation can be performed on that stream. Tuples are composed of fields, much like a database record. Every input or output file has field names associated with it, so that values in the tuple may be used as both declarators and selectors. Every processing element of the pipe assembly either expects the specified fields or creates them.

Data Stream transformation

As data moves through the pipe, streams may be separated or combined.

  • Split takes a single stream and sends it down multiple paths - that is, it feeds a single Pipe instance into two or more subsequent separate Pipe instances with unique branch names.
  • Merge combines two or more streams that have identical fields into a single stream. This is performed by passing two or more Pipe instances to a Merge or GroupBy pipe.
  • Join combines data from two or more streams that have different fields, based on common field values (analogous to a SQL join).

The manner of Data Stream transformation depends on the type of Pipe.

There are six Pipe types defined as subclasses of Pipe for operations on the tuple streams as they pass through the assemblies: Each, Merge, GroupBy, Every, CoGroup and HashJoin. Pipes may involve operating on individual tuples (e.g., transform or filter), on groups of related tuples (e.g., count or subtotal), or on entire streams. The following is a short overview of the pipes available in Cascading.

  • Each pipes perform operations based on the data content of individual tuples - applying functions or filters, such as conditionally replacing certain field values, removing tuples that have values outside a target range, etc. You may use Each pipes to split or branch a data stream.
  • Merge can be used to combine two or more streams into one, as long as they have the same fields. Merge emits a single stream of tuples (in random order) that contains all the tuples from all the specified input streams.
  • GroupBy groups the tuples of a stream based on the common values in a specified field. The purpose of grouping is typically to prepare a stream for processing by the Every pipe which performs aggregator and buffer operations on the groups, such as counting, totaling, or averaging values within that group.
  • Every pipe operates on a tuple stream that has been grouped. Thus, the Every class is only for use on the output of GroupBy or CoGroup, and cannot be used with the output of Each, Merge, or HashJoin.
  • CoGroup and HashJoin perform a join on two or more streams, similar to a SQL join, and groups the single resulting output stream on the value of a specified field. The resulting output stream contains fields from all the input streams. The difference between them is that the former is a Reduce-side join while the latter is a Map-side join that loads tuples from right-side pipe(s) in memory. Thus, HashJoin has some constraints regarding data size, but is optimized to join small streams to no more than one large stream.


Data transformation

Cascading allows developers to transform data (tuples) as a tuple stream goes through the pipe assemblies, like filter, organize etc. To do this, Cascading provides the concepts of Operation and Fields.

  • Operation takes a tuple as an input, applies an operation to it and produces zero or more result tuples. Cascading provides some classes for implementing Operation interface, like Filter, Aggregator, Function, etc.
  • Field may be used both to declare field names, and to reference field value in a tuple.

To summarize, your code should have a tap to get input data, a tap to dump output data, and in between, some pipes where all the data processing happens.

To learn more about the programming model of Cascading please visit http://www.cascading.org/documentation.