Hi Jeff, Thanks a lot for your explanation. It really helps for understanding the details of job workflow.
Hi all, Thanks a lot for your help. One more question, through monitoring data I find the iowait% is quite high. Do you think this normal for there's a lot of data read and written, as well as copy through network? The scale of the overall input data to be sorted is 500GB, and my testing environment is a 10 nodes cluster. I've tried to increase related parameters such as "io.sort.mb", "io.sort.factor", and used LZO compression, but seems still couldn't reduce the iowait%. Any comments? Thanks. Best Regards, Carp 2010/6/18 Jeff Zhang <zjf...@gmail.com> > The scale of each reducer depends on the Partitioner. You can think of > Partitioner as a Hash Function, and the reducer as bucket, So you can > not expect that each bucket has same number of items. > > Skewed data distribution will make a few reducers cost much more time. > > > > 2010/6/18 李钰 <car...@gmail.com>: > > Hi Jeff and Amogh, > > > > Thanks for your comments! In my understanding, in the partitioning phase > > before spilling to disk, the threads will divide the data into partitions > > corresponding to the number of reducers, as described int the Definitive > > Guide. So I think the scale of input data should be the same for each > > reducer. I wonder if I have any misunderstanding about this, please > correct > > me if you find any faults, thanks. > > > > As to the reduce phases, I did check the time of shuffle, sort and reduce > > through the JT UI, but found it quite different for each reduce task. > Some > > task may have longer shuffle time but less reduce time, while some may > have > > less shuffle time but longer reduce time. I set the reducer number large > > enough to let all reduce tasks run in parallel, and set > > "mapred.reduce.slowstart.completed.maps" parameter to 1.0 to let them > start > > at the same time when all map tasks have been finished, and I think this > may > > reduce the impact of network and time cost of waiting for map task to > finish > > during the shuffle phase. Then why still got quite different time spent > in > > shuffle? And since the reduce phase of reduce is just writing sorted data > > into HDFS, why the time of reduce phase is different? > > > > Anything wrong with my analyzing? Any suggestions? Thanks a lot. > > > > Dear all, > > > > Any other comments? Thanks. > > > > Best Regards, > > Carp > > > > > > 在 2010年6月18日 上午11:39,Amogh Vasekar <am...@yahoo-inc.com>写道: > > > >> > >> >>Since the scale of input data and operations of each reduce task is > the > >> same, what may cause the execution time of reduce tasks different? > >> > >> You should consider looking at the copy, shuffle and reduce times > >> separately from JT UI to get better info. Many (dynamic) considerations > like > >> network congestion, number of mappers reducer is fetching from, data > skew > >> wrt input keys to reducer etc will affect this number. > >> > >> HTH, > >> Amogh > >> > >> On 6/18/10 8:05 AM, "李钰" <car...@gmail.com> wrote: > >> > >> Hi Todd and Jeff, > >> > >> Thanks a lot for your discussion, it's really helpful to me. I'd like to > >> express my especial appreciation for Todd's patient explanation, you > help > >> me > >> see more clearly about the working mechanism of SORT. And Jeff, really > >> thank > >> you for reminding me that sort uses TotalOrderPartitioner to do > >> partitioning. > >> Based on your discussion I update my understanding as follows: > >> The sorting happens on the map side during the spill process of each map > >> task, after that, the overall map outputs are partitioned by method of > >> TotalOrderPartitioner, this decides the input range of each reducer. > >> Reducers get map outputs as decided by the partitioner, and do merging > and > >> write results into HDFS. > >> Is this understanding right? Please correct me if you find any faults, > >> thanks. > >> If this understanding is right, then my question rolls back to the > original > >> one: Since the scale of input data and operations of each reduce task is > >> the > >> same, what may cause the execution time of reduce tasks different? All > >> nodes > >> used in my experiment are on the same rack, and they are homogenous. > >> Any suggesion will be highly appreciated, thanks. > >> > >> Best Regards, > >> Carp > >> > >> 2010/6/18 Todd Lipcon <t...@cloudera.com> > >> > >> > On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang <zjf...@gmail.com> wrote: > >> > > >> > > Todd, > >> > > > >> > > Why's there a sorting in map task, the sorting here seems useless in > my > >> > > opinion. > >> > > > >> > > > >> > For map-only jobs there isn't. For jobs with reduce, typically the > number > >> > of > >> > reduce tasks is smaller than the number of map tasks, so parallelizing > >> the > >> > sort on the mappers and just doing merge on the reducers is > beneficial. > >> > Second, this allows the combiner to run on the mapper by identifying > when > >> > it > >> > has multiple outputs for the same key. Third, this allows improved > >> > compression on the map output (thus less intermediate data transfer) > by > >> > putting similar keys near each other (hopefully within the compression > >> > window). Fourth, it kills two birds with one stone since the mappers > >> > already > >> > have to group outputs by the partition. > >> > > >> > -Todd > >> > > >> > > >> > > > >> > > > >> > > On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon <t...@cloudera.com> > >> wrote: > >> > > > On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang <zjf...@gmail.com> > >> wrote: > >> > > > > >> > > >> Your understanding of Sort is not right. The key concept of Sort > is > >> > > >> the TotalOrderPartitioner. Actually before the map-reduce job, > >> client > >> > > >> side will do sampling of input data to estimate the distribution > of > >> > > >> input data. And the mapper do nothing, each reducer will fetch > its > >> > > >> data according the TotalOrderPartitioner. The data in each > reducer > >> is > >> > > >> local sorted, and each reducer are sorted ( r0<r1<r2....), so the > >> > > >> overall result data is sorted. > >> > > >> > >> > > > > >> > > > The sorting happens on the map side, actually, during the spill > >> > process. > >> > > The > >> > > > mapper itself is an identity function, but the map task code does > >> > perform > >> > > a > >> > > > sort (on a <partition,key> tuple) as originally described in this > >> > thread. > >> > > > Reducers just do a merge of mapper outputs. > >> > > > > >> > > > -Todd > >> > > > > >> > > > > >> > > >> > >> > > >> > >> > > >> > >> > > >> On Thu, Jun 17, 2010 at 12:13 AM, 李钰 <car...@gmail.com> wrote: > >> > > >> > Hi all, > >> > > >> > > >> > > >> > I'm doing some tuning of the sort benchmark of hadoop. To be > more > >> > > >> specified, > >> > > >> > running test against the org.apache.hadoop.examples.Sort class. > As > >> > > >> looking > >> > > >> > through the source code, I think the map tasks take > responsibility > >> > of > >> > > >> > sorting the input data, and the reduce tasks just merge the map > >> > > outputs > >> > > >> and > >> > > >> > write them into HDFS. But here I've got a question I couldn't > >> > > understand: > >> > > >> > the time cost of the reduce phase of each reduce task, that is > >> > writing > >> > > >> data > >> > > >> > into HDFS, is different from each other. Since the input data > and > >> > > >> operations > >> > > >> > of each reduce task is the same, what reason will cause the > >> > execution > >> > > >> time > >> > > >> > different? Is there anything wrong of my understanding? Does > >> anybody > >> > > have > >> > > >> > any experience on this? Badly need your help, thanks. > >> > > >> > > >> > > >> > Best Regards, > >> > > >> > Carp > >> > > >> > > >> > > >> > >> > > >> > >> > > >> > >> > > >> -- > >> > > >> Best Regards > >> > > >> > >> > > >> Jeff Zhang > >> > > >> > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > Todd Lipcon > >> > > > Software Engineer, Cloudera > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > Best Regards > >> > > > >> > > Jeff Zhang > >> > > > >> > > >> > > >> > > >> > -- > >> > Todd Lipcon > >> > Software Engineer, Cloudera > >> > > >> > >> > > > > > > -- > Best Regards > > Jeff Zhang >