The Design of Software (CLOSED)

A public forum for discussing the design of software, from the user interface to the code architecture. Now closed.

The "Design of Software" discussion group has been merged with the main Joel on Software discussion group.

The archives will remain online indefinitely.

A MapReduce doubt

MapReduce paradigm is primarily aimed at parallelizing tasks which can then be executed in a distributed system.

I can understand that the "map" part of the problem can be executed in parallel. What I am confused about is that Reduce  part.

Since Reduce has to aggregate the results of all Map tasks, does it mean that
1. reduce would run Only in a Single Node in a cluster ? 
OR
2. is that map task output is fed to intermediate Reduce tasks whose input is again fed to map till all the aggregation is done and in this case reduce too can be distributed in many nodes.

Which of 1 or 2 is correct ?
Sathya Send private email
Wednesday, August 27, 2008
 
 
Reduce tasks can operate on more than one node, but Map/Reduce is an N:M node task where M<N by often a large margin. So this needs to be factored in when you design the two stages so as to make it more parallel. A Partitioner determines the spread of work across reduce tasks for operating against the Map's output.

There are two approaches that I've seen for implementing Map/Reduce frameworks. The approach Google describes in their paper, and how I implemented it, uses a multi-state Master/Worker approach. Three master/worker tasks are needed (Map, Sort/Shuffle, Reduce) and are dependant on each other. This makes the design quite flexible and makes Master/Worker the basic construct from which other abstractions can be implemented on top of. I implemented Map/Reduce and Fork/Join on top of mine.

The approach that Hadoop took was a supervisor model that monitors a mesh of map/reduce nodes. Every Map node knows of every Reduce node. This allows it to parition the results in-place and push it to the correct Reduce node, thereby allowing both to execute in parallel. This greatly simplifies their sort/shuffle process, but creates a hair-ball distributed environment. In Hadoop, Map/Reduce is the basic construct form which a Master/Worker construct can be simulated via an identity stage (very common).

In Google's paper they mention that it is very common to see multiple discrete Map/Reduce stages being performed to process a data set. So you can have Reducers feed into Mappers, but its not done explicitly that way by the framework.

I have heard of stories where the users of Hadoop would define poor partitioning functions that resulted in only a small number of reduce nodes being used. When this was realized and fixed, they would see orders of magnitude improvements. So while it makes distributed processing easier, naive developers can still make it perform badly.
Benjamin Manes Send private email
Wednesday, August 27, 2008
 
 
Hi Benjamin Manes

Thanks for the pointers. After reading your reply, I also referred to google's slides on MapReduce which contain diagrams of the MapReduce process. As you have mentioend there are lesser reduce operations comapred to maps.

Let us take the proverbial word count example that is used to explain map-reduce. Let us assume, a document contains 100 distinct words with each word occurring more than once. Now I would like to visualize the map-reduce tasks. Each word is split as (word, count) pair. now when performing the reduce, is it the case that all map tasks of a particular word are reduced by ONE reduce tasks ? The reduce tasks can handle (reduce) more than one word but all map tasks for a word are reduced by one reduce task ? This still preserves the N:M that you mentioned, but for a given word it is n:1.

My confusion still is that if reduce is going to be randomly allocated, then shouldnt there be multiple levels of reduce so that eventaully there is just one output ? Or am I missing something critical ?

Pls bear with my ignorance.
Sathya Send private email
Wednesday, August 27, 2008
 
 
Based on that there is more than one reduce task, there would have to be a final stage.

I haven't read the Google paper in a while, but I thought I read that the original master, the one who started the reduce/map process, would eventually gather the reduce results and compile them  (Please correct me if I'm wrong on this).
Brian Shipe Send private email
Wednesday, August 27, 2008
 
 
Sathya:
> when performing the reduce, is it the case that all map
> tasks of a particular word are reduced by ONE reduce
> tasks ?

Yes. All word pairs for a particular word are reduced by one reducer. If there is a good distribution of words, then this won't affect performance as other reducers will be working on other pairs. Also, the amount of work being performed is lessened by using a Combiner, which is simply the performance of the reduce task by the Mapper when computing its results.

A poor partitioning function would be one that splits based on the word's first character. It would allow for up to 26 reducers with a poor distribution (workload: "s" reducer > "z" reducer). Since each reducer is given a unique working set, there is no feed-back process needed as each result will reflect a unique set of words.

Brian Shipe:
> the one who started the reduce/map process, would
> eventually gather the reduce results and compile them

If I remember correctly, Hadoop doesn't do this. It just announces completion which means you have M result files. You need to append them yourself to arrive at a singular file.

For Google, I don't remember them specifying but I would guess that you are correct. Since they tie into their distributed filesystem, I wouldn't be surprised if it supported a concat operation that was extremely cheap. It wouldn't need to perform expensive copying I/O, but simply update the chunk information (inode-like file info) to make it look like one file.
Benjamin Manes Send private email
Wednesday, August 27, 2008
 
 
Thanks a ton Benjamin. Its been of great use in understanding MR.

One last clarification.

Since map and reduce are independent programs, and the underlying MapReduce platform (like hadoop) is generic, Is it correct to assume that allocation of a Reducer is based on the "key" values? In your example above of having 26 reducers in a poorly written partition, does it mean that the key value is the alphabet? The underlying framework directs all (key, val) pairs to the appropriate reducer.

Then is it up to the framework to determine the approximate number of reducers based on the spread (or how diverse) of the key values? Like in the example above, if the key is the starting alphabet of the word, then in a well distributed document there would eventually be 26 key values ( as determined by the map function).

When I looked into the Hadoop examples, for the wordcount program, where there are 3 input files, there are 3 mapper tasks and one reducer task. So I presume that here the logic seems to be one mapper per file and one reducer overall. You can find the example here
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
In particular look at the table that lists the Map and Reduce tasks ( 3 maps and 1 reduce).
http://www.michael-noll.com/wiki/Image:Hadoop-web-interface-screenshot.png

In short,
1) how does the MR infrastructure (eg., hadoop)  decide the number of Map and reduce tasks.
2) how does the MR infrastructure allocate the reducers.

Answers to these questions would help developers like me understand how to write efficient MapReduce programs.

I too will look out for answers and post them if I come by any.
Sathya Send private email
Wednesday, August 27, 2008
 
 
> Is it correct to assume that allocation of a Reducer is
> based on the "key" values?

Yes. The partitioner is king.

> In your example above of having 26 reducers in a poorly
> written partition, does it mean that the key value is
> the alphabet?

In my example, the keys are words and the hashing function is the alphabet. The hashing is based on the first letter in the word which makes it trivially simple but scales to only 26 nodes. In practice, you would hash based on the number of nodes you can work with (key % M).

> how does the MR infrastructure (eg., hadoop)  decide the
> number of Map and reduce tasks.

In Hadoop, you must explicitly state the number of Mappers and Reducers in the JobConfiguration. You have to do this because it needs to construct the mesh. Whether you use them all is a different story, of course, since you may limit yourself by having a bad partition function. The Map doesn't have a real limit, since you spread the data evenly across the nodes. The number of Reduce tasks is dictated by how well your data is spreadable and the partition function. A good partitioner and uniform data set should let it scale linearly. It just makes it easier, but people will always find ways to screw it up.

Think of the partition function as a hashing function and the reducers as buckets in a hash-table. If you have a good hashing function, you will have a nice spread across the buckets. If you have a bad one, everything will go into a few buckets. For a hash table it would be expensive to scan the bucket's linked-list. For Map/Reduce it would make a node do an unfair amount of the work. A poor hash would cause performance problems for both algorithms.

> how does the MR infrastructure allocate the reducers.

They are just work nodes, so it just needs to acquire them either explicitly (Hadoop) or implicity (mine/Google's). You can keep a running count of nodes and the workload on the network fairly easily to dynamically size jobs. I simply dispatch work onto a queue and allow the message bus to route to a free processing node. Allocation and messaging is not a difficult problem.

---------------------------------------------------------

One aspect that might make this simpler to understand is a limitation that I had to design for. We don't use a distributed file-system, nor were they willing to adopt one. This meant that I needed to send small, discrete messages for computation and allow the master to handle thousands of workers concurrently. This means I have a huge number of short-running work requests whereas Hadoop/Google have few long-running work requests. They can simply write the results to the DFS as a global data store that survives worker node crashes, whereas I must give workers small requests can must use the master's local FS. This means I had to put much more effort into constructing an extremely efficent concurrent algorithm. I discovered Hadoop a few years later and was amazed at just how aweful their implementation was. But they could get away with it...

For me, master/worker tasks are bounded entirely by the number of available nodes. I don't allocate nodes but allow the message bus to spray out small, recoverable requests. As with all implementations, my limit for the number of reducers is based entirely on the partition function. So allocation is trivial. So don't focus too much on those aspects, since from a framework perspective its simple. Once you begin using a framework it is easy to write efficent tasks and if you don't it is fairly obvious when determining how to fix it.
Benjamin Manes Send private email
Thursday, August 28, 2008
 
 
Thanks for the detailed explanation. It has cleared almost all my doubts.

I understand it would have been really challenging to build an infrastructure without a dfs (even though one might have scalability concerns in this approach. But then it is perhaps very  specific to the client to whom you built it, I guess ).
Especially, writing extremely concurrent algos, and spraying out small recoverable requests should have indeed been challenging I guess.

Thanks once again Benjamin.
Sathya Send private email
Thursday, August 28, 2008
 
 
MapReduce can be complex...but it can be very powerful when combining structured and unstructured data for deep analysis.

Aster Data just came out with an integrated MapReduce + SQL database solution...makes it much easier to just leverage SQL commands/types/functions whenever possible, instead of having to keep re-inventing the wheel in Java!

http://www.asterdata.com/product/mapreduce.html
FacebookDev Send private email
Thursday, August 28, 2008
 
 

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics
 
Powered by FogBugz