Software Scalability with MapReduce
First published online April 2010
The architecture of a software system is the structure and design of the components that work together in unison to perform the goal of the system as a whole. On large scale systems, the architecture can describe independent components and their interactions; interface specification, communication mechanisms and coupling. Components can be internal to the software system, they can be multiple software components on a server or multiple components executing across a network of servers.
Reusability has appealed to computer scientists since the first stored program computer EDSAC (Electronic Delay Storage Automatic Calculator) at the University of Cambridge in the United Kingdom in 1949, where the first subroutine library was proposed. Reuse can occur in many ways in a computer system; common code factored into subroutines and called multiple times from within the application software or common modules linked together in different configurations. There are now widespread examples of software component reuse in commercial and open-source systems. The software industry has embraced the concept of reuse to such an extent that it is now ubiquitous and seldom referred to as reuse anymore.
Established examples of software reuse are component libraries that are developed and maintained for the sole purpose of being integrated into a software application – i.e. being reused by a number of applications. These libraries are widely available in the open-source community and from commercial software houses. Application frameworks are an extension to component libraries that provide a set of subroutines that are fundamental to the structure and execution of the applications. Rather than providing functional elements that are incorporated into an application, the library provides a rich architectural environment into which the developers add bespoke functionality. Application Programming Interfaces (APIs) are a popular mechanism for software vendors to expose functionality from their applications to third party developers in a controlled and secure way without compromising their intellectual property. This enables vendors to improve their market penetration by making their application functionality available through other applications, while maintaining the core business assets. In the internet era, the idea of APIs has been instrumental in the adoption and development of the Web as a platform for modern computing and access to a new audience.
Traditional application-bound APIs have been re-engineered into lightweight and loosely coupled interfaces for exchanging data over relatively slow networks, and new technologies and concepts such as Service Oriented Architecture, AJAX and Adobe Flash have appeared. Web technologies have matured and application frameworks are now available that provide a rich set of application development tools; Google Gears, Dojo and Adobe Flex are examples of these. With the advent of Web 2.0, a new reusable concept has been developed, called a mashup. A mashup is a web page or web application that combines data or functionality from two or more external sources to create a new service. The external sources are integrated into the application by use of their published APIs.
The concept of software component reuse is readily extendable to the architectural design of computer systems. Architecture frameworks provide an execution environment within which a computer system runs. The .Net Framework from Microsoft and the Java Runtime Environment from Sun Microsystems are two very popular frameworks.
MapReduce is an architecture framework designed for creating and processing large volumes of data using clusters of computers. The concepts behind MapReduce have a long history in the functional programming community, in languages such as Lisp. However, it was not until 2004, when Google presented a paper at the Sixth Symposium on Operating System Design and Implementation [OSDI'04], that the idea of applying paired map and reduce functions to large data processing problems became a source of widespread interest in mainstream academia, open-source and commercial organizations. Google's paper entitled MapReduce: Simplified Data Processing on Large Clusters [Dean1] is now often cited as the introduction of MapReduce to the problem of data processing.
The MapReduce programming model and distributed execution platform was originally described for easily scaling systems to run efficiently across thousands of commodity machines on a dedicated high-speed network. The model has gained significant momentum in commercial, research and open-source projects since the original paper was published and has been successfully applied to a number of problems, demonstrating its versatility for real-world data processing solutions. An important development from Google's original paper is in applying the MapReduce concept to parallel processing on multi-core environments such as multi-core/multi-processor machines and graphics processors. The scalability achieved using MapReduce to implement data processing across a large volume of CPUs with low implementation costs, whether on a single server or multiple machines, is an attractive proposition. In 2008 the original authors updated and republished their seminal paper [Dean2] in response to its growing popularity.
As we have seen, to achieve good scalability, a computer system needs to be able to run many simultaneous threads of execution in isolation of each other, or at least with minimal dependency between executions.
To successfully have independence in the execution, a system also needs to have independence in the supply of input data and the production of output data. If either the input to the processing or the output from the processing needs synchronization of between the simultaneous executions, then the benefits are severely diminished. The impact on the performance of a computer system caused by the synchronization simultaneous executions attempting to access a shared resource is called contention.
A means to overcoming contention on shared input data is to employ a technique known as Data Partitioning. This involves isolating the data that is being processed by each execution into groups - partitions - which can be read and processed independently of other data partitions. Data partitioning is a fundamental principle to enabling scalability.
Figure 1-1 illustrates data partitioning. On the left, a data source is supplied to a processing unit which is capable of processing multiple streams simultaneously. Data feeds in through a pipeline into a synchronization barrier which allocates data to each of the processing execution streams. It is this barrier that causes contention. While data is being allocated to execution Y, execution X may be starved of data and become idle. If this is repeated regularly, then the contention on input data causes a lot of starvation and the overall runtime increases. The alternative scenario shows the input data has been partitioned and each of the execution streams has its own data source supplying input data. There is no requirement for synchronization and therefore no contention, no starvation of data and no idle processing time.
Data Partitioning Methods
The rules by which data is apportioned to a partition can be a very simple record-counting method or more complex key-based algorithms. A record-counting method, for example, may divide the dataset into partitions based on a principle that a pre-defined number of records belong to each partition. Starting at the beginning of the dataset, the first n records belong to the first partition; the second n records belong to the second partition, etc. until all records have been allocated to a partition. A key-based algorithm may define a key to be the first k characters of a record and for each record in the dataset the partition assignment is calculated based on the value of the key. A common mechanism is to apply a hash function such as Secure Hash Algorithm (SHA), Message-Digest Algorithm 5 (MD5) or Cyclic Redundancy Check (CRC) to the key character string to generate a hash value, and then modulo m is calculated to give the partition number, where m is the number of partitions required.
partition = md5(key) modulo m
An important consideration when selecting a partitioning schema is the total number of partitions that will be created. In the first example, the record-counting method, the number of partitions is logically unlimited and will be determined purely by the size of the input dataset. Undefined limits such as these can be dangerous in a computer system. Even the largest and most scalable systems have a limit, so they must be prepared for the situation where the physical limit is exceeded because of the logically unlimited parameter.
The key-based partitioning mechanism pre-defines the maximum number of partitions that will be created. It is important to select a good hash function that will generate each potential hash value with roughly equal probability. This will guarantee that each partition is broadly equal in size to ensure that each data processing execution has the same amount of work to do.
To illustrate the selection of hash functions, consider an experiment performed using four hash functions; a 32-bit word implementation of the Secure Hash Algorithm SHA256, the MD5 producing a 128-bit hash value, and two versions of Cyclic Redundancy Check, a 16-bit and 32-bit wide implementation CRC16 and CRC32 respectively.
Figure 1-2 shows an example of partition allocations in a test where a sample dataset was passed through a number of hash functions and the partition number in each case was calculated using modulo 10 of the calculated hash value. For each partition number along the x-axis, the frequency of keys allocated to that partition for each hash function is plotted on the y-axis. The scale of the y-axis is large to show the variance clearly as the range is relatively small; between 9.64% (SHA256's allocation to partition 7) and 10.24% (MD5's allocation to partition 0). The optimum hashing function is the one that is plotted as the flattest line, which in this sample dataset is the CRC16 function (9.94% - 10.07%) and the most unbalanced partition set was generated using SHA256 (9.64% - 10.23%).
Note that these results are from a sample test dataset. Hash function performance is very dependent on the data itself, so it is impossible to select a single hash function that will produce an optimum distribution of data partitions for all input datasets.
Hash functions themselves have a cost in execution time, so there is a balance to consider. Some hash functions are very expensive to calculate and the overhead of using one hash function over another may not be justified with the saving in processing time gained by a slightly more balanced dataset.
There is little benefit in using a complex and computationally expensive hash algorithm to calculate a perfectly distributed hash value if is it used to simply calculate the partition assignment by taking modulo p of the value; particularly if p is small. Figure 1-3 shows the relative runtimes of each hash function from the experiment above. In this example, the CRC32 performed the quickest, followed by CRC16, so the CRC algorithms produce the best distribution and execute the fastest of the four test hash functions, but there is a choice on distribution versus performance between CRC16 and CRC32.
Balancing the dataset sizes in this way is important to optimize the data processing and reduce the overall running time. Each partition of data is processed independently and simultaneously, and - assuming there is enough processing resource available to process every partition simultaneously - the end-to-end time of processing the entire set of data will be determined by the longest running process. It follows that the longest running will be the largest partition, and therefore if all partitions are the same size then this is the best chance of an optimal-fit to finishing in the quickest time.
Figure 1-4 illustrates the relative execution time for processing the same ten partitions in this experiment. The overall time-to-completion is dictated by the longest partition execution time, and an unbalanced set of data partitions therefore skews the overall runtime performance. Here, the MD5 hash function created a large partition 0 disproportionate to partition 6, so processing completes on partition 6 in about half the time of partition 0. Conversely, the CRC16 hash function created values with a more even distribution and resulted in more evenly sized partitions. These partitions were processed in parallel and all finished within a short time of each other, thus completing the overall processing in a lesser amount of time than using the other hash functions.
Data Storage Strategies
Using data partitioning, the contention is removed from the processing to the supply of the data. If each partition of data is stored on a single disk volume, which has the ability to read only one piece of data at a time, then contention is seen here instead. Modern hard disk drives and their software drivers are very efficient in servicing multiple simultaneous read/write requests, nonetheless reading data from a hard disk drive is relatively slow in any computer system. A means to overcoming contention on the hard disk is to use multiple drives. Storing independent data partitions on separate volumes avoids disk contention and can improve performance significantly.
As a computer system scales, it consists of more hardware and more software, and that means there is more to go wrong. On the positive side, it also means that there is more opportunity to automatically overcome a single point of failure.
If a system is running entirely on a single machine, there is no communication with other machines to go wrong, and the only points of failure are local; the machine components may fail, or the software on the machine may exhibit a fault. In either case, there is often very little that can be done to automatically recover from these failures. A hardware failure will typically cause the system to halt, in which case there is no opportunity for any automatic recovery action. A software fault may be more recoverable, but with any remedial action being taken, a restart of the software will most likely see a repeat of the fault in the near future.
Conversely, consider a computer system that is distributed across multiple machines connected together by a local area network, such as Ethernet. Each of the machines processes data and produces a subset of results, working in conjunction with each other to its end goal. Passing data and messages between the machines is critical to the successful and efficient functioning of the system.
Where are the potential points of failure in this computer system?
Each machine has many hardware components that may fail causing the machine itself to fail
Each machine runs software that will contain programming errors that may cause the machine to fail
Each machine is connected to every other machine via a network. Networks are inherently unreliable and a failure in the network can prevent machines from communicating
The first two points are the same as in a single machine system. However, the difference is that the failure of an individual machine does not need to conclude in failure of the entire system. As the volume of hardware and software that is used to construct a computer system increase, then so does the opportunity and probability of failures occurring. Consequently, automatic reaction to failures within the system and an ability to reconfigure the working environment to have the system succeed in the event of individual failures becomes increasingly important in the scalability of a system. This resilience to failure is known as fault tolerance.
The most fundamental mechanism for software fault tolerance is built into every good software application without even being called fault tolerance, but rather it is understood to be a good practice in software engineering. Error handling, by checking error codes after calls to an API function or exception handling, for example, should be built into the core of a software system to take appropriate action if an error occurs. Typically, expected errors are those that can occur during the normal running of a problem and are treated simply as a conditional path of execution. Exceptions are errors that are known to be possible, but not expected during the normal course of operation of the system. An expected error is therefore recoverable whereas an exception can often result in the system aborting with no appropriate recovery action being available.
Complex safety critical systems can employ a technique called diversity to increase the reliability of the system. This technique involves multiple implementations of one specification, which enables the system to identify and recover from errors in one of the implementations. Each alternative implementation is independently designed to produce the same result. The module determines the correct result based upon the individual results obtained from the alternative implementations. Ideally, all implementations produce the same result; otherwise a choice is made based upon select criteria [Inacio].
Handling hardware failures can be more challenging than handling software failures as the remedial action must be taken from outside of the failing component. Software can intrinsically detect error conditions and react appropriately and while individual hardware components can do the same, handling the failure of hardware must be done externally.
To guarantee recovery from all hardware failures, the overall system must always maintain a state where the absence of any individual piece of hardware will have no effect on the ability of the system. Two techniques can be used in conjunction with each other to satisfy these requirements; hardware replication and hardware redundancy. Replication can be applied to data and processing; data is stored on multiple storage devices so that it is available from a different device if one fails, and processing is executed several times in case one fails to complete. Redundancy is implemented by duplicating key components within the system so that if one of the components fails, then the other can be used in its place. A common example of redundancy is a RAID device (redundant array of inexpensive disks) which combines multiple hard disk drives and duplicates data written to the device across the multiple disks so that if a single disk fails, the data is still accessible.
The MapReduce framework provides for fault tolerance in its execution. This inherent support is a critical benefit to the framework.
Scale and Context of Software Systems
It is only quite recently that mainstream engineers and architects have begun to think about software systems that can be larger than a single machine. The size and capacity of the machine and its peripheral devices has previously been the constraint on the size and complexity of the software and the volume of data that can be processed by a single system.
The introduction of multi-core processors to laptops, desktops and servers has done little to advance the architecture of software. Instead, the industry has embraced these processors as a way for machines to run more software rather than an opportunity to improve software to run faster and more efficiently.
Writing good, fast and efficient multitasking software is difficult with most current programming languages because the languages were designed before the advent of multitasking capabilities and therefore lack primitives to support synchronization of execution thread, cache management and other considerations for high performance multitasking software.
Scalability is therefore an important consideration at every degree of granularity in a software system where a new dimension of execution is introduced.
Can the software scale efficiently from a single thread of execution to running multiple concurrent threads?
Can the system scale efficiently from a single machine to a cluster of interconnected machines on a local area network?
Each of these step changes in dimension of software execution has traditionally been tackled as separate, distinct problems with their own complex solutions which themselves have shortcomings. The combinations of which have highlighted the complexity and difficulty in developing truly efficient scalable software systems.
While MapReduce is no silver bullet, it does provide a framework and programming discipline that can be successfully applied to the problem of scaling software applications through multi-core processors and multiple machine cluster infrastructures. The framework abstracts the complex parallelization, synchronization and communication mechanisms and protects the programmer from having to consider these aspects of scalability in each system implementation. A common architecture, albeit with a necessarily different implementation for each granularity of scale, also enables a degree of reusability that has not been seen before. A system can be re-designed from a current single-threaded implementation to MapReduce architecture, implemented to scale to a multithreaded single-machine environment as well as a multiple-machine cluster of interconnected computers.
At the core of MapReduce are the two phases of execution that give the framework its name. A Map Phase processes the system's input data and produces intermediate results which are sorted and collated by the infrastructure and passed to the Reduce Phase which further processes data in groups of identical keys, and produces the final results. An overview of a MapReduce Job is shown in Figure 1-5.
MapReduce references data using pairs of data items – a key item and a value item – called a key/value pair. A MapReduce job is executed by first applying a mapping function to a set of partitioned input data of key/value pairs. Multiple mapping functions are executed simultaneously, each processing a distinct partition of data. The number of simultaneous functions executed is dependent on the capacity and workload of the environment as well as user-specified recommendations. Other data partitions are queued by the framework until capacity is available to execute them, either by more resource being made available to the system, or by other map functions completing. Output from the map phase is an intermediate sequence of key/value pairs, which the framework collates and sorts by their key. A reducing function is then applied atomically to each group of values with identical keys. A number of reducing functions run concurrently to produce a result set of key/value pairs which form the output of the job.
The algorithmic pattern of MapReduce is not itself new, but what Google have done is to apply the theory and create an infrastructure for using the pattern on large datasets across many machines on a local network. In doing this, they have popularized an already available but largely unexposed technique, and made it available to the computer science community by documenting sufficient details of the implementation of Google's own framework that others have been able re-create a version of it and further develop the MapReduce concept. There are a number of MapReduce implementations available across a plurality of hardware platforms; the most widely known and popular of which is Hadoop. I have developed an open source C++ implementation of a MapReduce framework for a single machine multitasking environment.
MapReduce will not solve all the world's scalability and data processing problems. It is very rare for one-size-fits-all to apply in computing, but the complexity of scalable software is definitely one situation where it does suit. MapReduce is a very useful framework that enables massive scalability of appropriate computing problems. The important point here is appropriate computing problems; the best advice, as ever, is to use the right tool for the job and don't try to force a problem into fitting a solution. Let the definition of the problem and the constraints of the system determine the selection of an appropriate solution. Be aware of architectures that are well defined and proven to work, and take time to understand the pitfalls and drawbacks as well as the more obvious advantages of each. Finally, be prepared to use a collection of tools where necessary to create a hybrid architecture that defines a good solution to the problem at hand.