Re: Performance tuning of sort

2010-06-17 Thread Jeff Zhang
Your understanding of Sort is not right. The key concept of Sort is
the TotalOrderPartitioner. Actually before the map-reduce job, client
side will do sampling of input data to estimate the distribution of
input data. And the mapper do nothing, each reducer will fetch its
data according the TotalOrderPartitioner. The data in each reducer is
local sorted, and each reducer are sorted ( r0r1r2), so the
overall result data is sorted.



On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
 Hi all,

 I'm doing some tuning of the sort benchmark of hadoop. To be more specified,
 running test against the org.apache.hadoop.examples.Sort class. As looking
 through the source code, I think the map tasks take responsibility of
 sorting the input data, and the reduce tasks just merge the map outputs and
 write them into HDFS. But here I've got a question I couldn't understand:
 the time cost of the reduce phase of each reduce task, that is writing data
 into HDFS, is different from each other. Since the input data and operations
 of each reduce task is the same, what reason will cause the execution time
 different? Is there anything wrong of my understanding? Does anybody have
 any experience on this? Badly need your help, thanks.

 Best Regards,
 Carp




-- 
Best Regards

Jeff Zhang


Re: Performance tuning of sort

2010-06-17 Thread 李钰
Hi Jeff,

Really thank you for your reply. It really helps! I'll take a look at
TotalOrderPartitioner carefully.
BTW, what's your opinion of where the bottleneck lies in SORT, and which
parameters impact the performance of SORT most? Looking forward to your
reply, thanks.

Dear all,

Any other comments? Thanks.

Best Regards,
Carp

2010/6/17 Jeff Zhang zjf...@gmail.com

 Your understanding of Sort is not right. The key concept of Sort is
 the TotalOrderPartitioner. Actually before the map-reduce job, client
 side will do sampling of input data to estimate the distribution of
 input data. And the mapper do nothing, each reducer will fetch its
 data according the TotalOrderPartitioner. The data in each reducer is
 local sorted, and each reducer are sorted ( r0r1r2), so the
 overall result data is sorted.



 On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
  Hi all,
 
  I'm doing some tuning of the sort benchmark of hadoop. To be more
 specified,
  running test against the org.apache.hadoop.examples.Sort class. As
 looking
  through the source code, I think the map tasks take responsibility of
  sorting the input data, and the reduce tasks just merge the map outputs
 and
  write them into HDFS. But here I've got a question I couldn't understand:
  the time cost of the reduce phase of each reduce task, that is writing
 data
  into HDFS, is different from each other. Since the input data and
 operations
  of each reduce task is the same, what reason will cause the execution
 time
  different? Is there anything wrong of my understanding? Does anybody have
  any experience on this? Badly need your help, thanks.
 
  Best Regards,
  Carp
 



 --
 Best Regards

 Jeff Zhang



Re: Performance tuning of sort

2010-06-17 Thread Todd Lipcon
On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote:

 Your understanding of Sort is not right. The key concept of Sort is
 the TotalOrderPartitioner. Actually before the map-reduce job, client
 side will do sampling of input data to estimate the distribution of
 input data. And the mapper do nothing, each reducer will fetch its
 data according the TotalOrderPartitioner. The data in each reducer is
 local sorted, and each reducer are sorted ( r0r1r2), so the
 overall result data is sorted.


The sorting happens on the map side, actually, during the spill process. The
mapper itself is an identity function, but the map task code does perform a
sort (on a partition,key tuple) as originally described in this thread.
Reducers just do a merge of mapper outputs.

-Todd





 On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
  Hi all,
 
  I'm doing some tuning of the sort benchmark of hadoop. To be more
 specified,
  running test against the org.apache.hadoop.examples.Sort class. As
 looking
  through the source code, I think the map tasks take responsibility of
  sorting the input data, and the reduce tasks just merge the map outputs
 and
  write them into HDFS. But here I've got a question I couldn't understand:
  the time cost of the reduce phase of each reduce task, that is writing
 data
  into HDFS, is different from each other. Since the input data and
 operations
  of each reduce task is the same, what reason will cause the execution
 time
  different? Is there anything wrong of my understanding? Does anybody have
  any experience on this? Badly need your help, thanks.
 
  Best Regards,
  Carp
 



 --
 Best Regards

 Jeff Zhang




-- 
Todd Lipcon
Software Engineer, Cloudera


Re: Performance tuning of sort

2010-06-17 Thread Jeff Zhang
Todd,

Why's there a sorting in map task, the sorting here seems useless in my opinion.



On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote:
 On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote:

 Your understanding of Sort is not right. The key concept of Sort is
 the TotalOrderPartitioner. Actually before the map-reduce job, client
 side will do sampling of input data to estimate the distribution of
 input data. And the mapper do nothing, each reducer will fetch its
 data according the TotalOrderPartitioner. The data in each reducer is
 local sorted, and each reducer are sorted ( r0r1r2), so the
 overall result data is sorted.


 The sorting happens on the map side, actually, during the spill process. The
 mapper itself is an identity function, but the map task code does perform a
 sort (on a partition,key tuple) as originally described in this thread.
 Reducers just do a merge of mapper outputs.

 -Todd





 On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
  Hi all,
 
  I'm doing some tuning of the sort benchmark of hadoop. To be more
 specified,
  running test against the org.apache.hadoop.examples.Sort class. As
 looking
  through the source code, I think the map tasks take responsibility of
  sorting the input data, and the reduce tasks just merge the map outputs
 and
  write them into HDFS. But here I've got a question I couldn't understand:
  the time cost of the reduce phase of each reduce task, that is writing
 data
  into HDFS, is different from each other. Since the input data and
 operations
  of each reduce task is the same, what reason will cause the execution
 time
  different? Is there anything wrong of my understanding? Does anybody have
  any experience on this? Badly need your help, thanks.
 
  Best Regards,
  Carp
 



 --
 Best Regards

 Jeff Zhang




 --
 Todd Lipcon
 Software Engineer, Cloudera




-- 
Best Regards

Jeff Zhang


Re: Performance tuning of sort

2010-06-17 Thread Jeff Zhang
The input of each reducer is not same, it depends on the input data
distribution and Partitioner.
And the running time of each reducer consist of three phases: copy,
sort and reducer.


2010/6/18 李钰 car...@gmail.com:
 Hi Todd and Jeff,

 Thanks a lot for your discussion, it's really helpful to me. I'd like to
 express my especial appreciation for Todd's patient explanation, you help me
 see more clearly about the working mechanism of SORT. And Jeff, really thank
 you for reminding me that sort uses TotalOrderPartitioner to do
 partitioning.
 Based on your discussion I update my understanding as follows:
 The sorting happens on the map side during the spill process of each map
 task, after that, the overall map outputs are partitioned by method of
 TotalOrderPartitioner, this decides the input range of each reducer.
 Reducers get map outputs as decided by the partitioner, and do merging and
 write results into HDFS.
 Is this understanding right? Please correct me if you find any faults,
 thanks.
 If this understanding is right, then my question rolls back to the original
 one: Since the scale of input data and operations of each reduce task is the
 same, what may cause the execution time of reduce tasks different? All nodes
 used in my experiment are on the same rack, and they are homogenous.
 Any suggesion will be highly appreciated, thanks.

 Best Regards,
 Carp

 2010/6/18 Todd Lipcon t...@cloudera.com

 On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote:

  Todd,
 
  Why's there a sorting in map task, the sorting here seems useless in my
  opinion.
 
 
 For map-only jobs there isn't. For jobs with reduce, typically the number
 of
 reduce tasks is smaller than the number of map tasks, so parallelizing the
 sort on the mappers and just doing merge on the reducers is beneficial.
 Second, this allows the combiner to run on the mapper by identifying when
 it
 has multiple outputs for the same key. Third, this allows improved
 compression on the map output (thus less intermediate data transfer) by
 putting similar keys near each other (hopefully within the compression
 window). Fourth, it kills two birds with one stone since the mappers
 already
 have to group outputs by the partition.

 -Todd


 
 
  On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote:
   On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote:
  
   Your understanding of Sort is not right. The key concept of Sort is
   the TotalOrderPartitioner. Actually before the map-reduce job, client
   side will do sampling of input data to estimate the distribution of
   input data. And the mapper do nothing, each reducer will fetch its
   data according the TotalOrderPartitioner. The data in each reducer is
   local sorted, and each reducer are sorted ( r0r1r2), so the
   overall result data is sorted.
  
  
   The sorting happens on the map side, actually, during the spill
 process.
  The
   mapper itself is an identity function, but the map task code does
 perform
  a
   sort (on a partition,key tuple) as originally described in this
 thread.
   Reducers just do a merge of mapper outputs.
  
   -Todd
  
  
  
  
  
   On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
Hi all,
   
I'm doing some tuning of the sort benchmark of hadoop. To be more
   specified,
running test against the org.apache.hadoop.examples.Sort class. As
   looking
through the source code, I think the map tasks take responsibility
 of
sorting the input data, and the reduce tasks just merge the map
  outputs
   and
write them into HDFS. But here I've got a question I couldn't
  understand:
the time cost of the reduce phase of each reduce task, that is
 writing
   data
into HDFS, is different from each other. Since the input data and
   operations
of each reduce task is the same, what reason will cause the
 execution
   time
different? Is there anything wrong of my understanding? Does anybody
  have
any experience on this? Badly need your help, thanks.
   
Best Regards,
Carp
   
  
  
  
   --
   Best Regards
  
   Jeff Zhang
  
  
  
  
   --
   Todd Lipcon
   Software Engineer, Cloudera
  
 
 
 
  --
  Best Regards
 
  Jeff Zhang
 



 --
  Todd Lipcon
 Software Engineer, Cloudera





-- 
Best Regards

Jeff Zhang


Re: Performance tuning of sort

2010-06-17 Thread Amogh Vasekar

Since the scale of input data and operations of each reduce task is the same, 
what may cause the execution time of reduce tasks different?

You should consider looking at the copy, shuffle and reduce times separately 
from JT UI to get better info. Many (dynamic) considerations like network 
congestion, number of mappers reducer is fetching from, data skew wrt input 
keys to reducer etc will affect this number.

HTH,
Amogh

On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote:

Hi Todd and Jeff,

Thanks a lot for your discussion, it's really helpful to me. I'd like to
express my especial appreciation for Todd's patient explanation, you help me
see more clearly about the working mechanism of SORT. And Jeff, really thank
you for reminding me that sort uses TotalOrderPartitioner to do
partitioning.
Based on your discussion I update my understanding as follows:
The sorting happens on the map side during the spill process of each map
task, after that, the overall map outputs are partitioned by method of
TotalOrderPartitioner, this decides the input range of each reducer.
Reducers get map outputs as decided by the partitioner, and do merging and
write results into HDFS.
Is this understanding right? Please correct me if you find any faults,
thanks.
If this understanding is right, then my question rolls back to the original
one: Since the scale of input data and operations of each reduce task is the
same, what may cause the execution time of reduce tasks different? All nodes
used in my experiment are on the same rack, and they are homogenous.
Any suggesion will be highly appreciated, thanks.

Best Regards,
Carp

2010/6/18 Todd Lipcon t...@cloudera.com

 On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote:

  Todd,
 
  Why's there a sorting in map task, the sorting here seems useless in my
  opinion.
 
 
 For map-only jobs there isn't. For jobs with reduce, typically the number
 of
 reduce tasks is smaller than the number of map tasks, so parallelizing the
 sort on the mappers and just doing merge on the reducers is beneficial.
 Second, this allows the combiner to run on the mapper by identifying when
 it
 has multiple outputs for the same key. Third, this allows improved
 compression on the map output (thus less intermediate data transfer) by
 putting similar keys near each other (hopefully within the compression
 window). Fourth, it kills two birds with one stone since the mappers
 already
 have to group outputs by the partition.

 -Todd


 
 
  On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote:
   On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote:
  
   Your understanding of Sort is not right. The key concept of Sort is
   the TotalOrderPartitioner. Actually before the map-reduce job, client
   side will do sampling of input data to estimate the distribution of
   input data. And the mapper do nothing, each reducer will fetch its
   data according the TotalOrderPartitioner. The data in each reducer is
   local sorted, and each reducer are sorted ( r0r1r2), so the
   overall result data is sorted.
  
  
   The sorting happens on the map side, actually, during the spill
 process.
  The
   mapper itself is an identity function, but the map task code does
 perform
  a
   sort (on a partition,key tuple) as originally described in this
 thread.
   Reducers just do a merge of mapper outputs.
  
   -Todd
  
  
  
  
  
   On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
Hi all,
   
I'm doing some tuning of the sort benchmark of hadoop. To be more
   specified,
running test against the org.apache.hadoop.examples.Sort class. As
   looking
through the source code, I think the map tasks take responsibility
 of
sorting the input data, and the reduce tasks just merge the map
  outputs
   and
write them into HDFS. But here I've got a question I couldn't
  understand:
the time cost of the reduce phase of each reduce task, that is
 writing
   data
into HDFS, is different from each other. Since the input data and
   operations
of each reduce task is the same, what reason will cause the
 execution
   time
different? Is there anything wrong of my understanding? Does anybody
  have
any experience on this? Badly need your help, thanks.
   
Best Regards,
Carp
   
  
  
  
   --
   Best Regards
  
   Jeff Zhang
  
  
  
  
   --
   Todd Lipcon
   Software Engineer, Cloudera
  
 
 
 
  --
  Best Regards
 
  Jeff Zhang
 



 --
  Todd Lipcon
 Software Engineer, Cloudera




Re: Performance tuning of sort

2010-06-17 Thread 李钰
Hi Jeff and Amogh,

Thanks for your comments! In my understanding, in the partitioning phase
before spilling to disk, the threads will divide the data into partitions
corresponding to the number of reducers, as described int the Definitive
Guide. So I think the scale of input data should be the same for each
reducer. I wonder if I have any misunderstanding about this, please correct
me if you find any faults, thanks.

As to the reduce phases, I did check the time of shuffle, sort and reduce
through the JT UI, but found it quite different for each reduce task. Some
task may have longer shuffle time but less reduce time, while some may have
less shuffle time but longer reduce time. I set the reducer number large
enough to let all reduce tasks run in parallel, and set
mapred.reduce.slowstart.completed.maps parameter to 1.0 to let them start
at the same time when all map tasks have been finished, and I think this may
reduce the impact of network and time cost of waiting for map task to finish
during the shuffle phase. Then why still got quite different time spent in
shuffle? And since the reduce phase of reduce is just writing sorted data
into HDFS, why the time of reduce phase is different?

Anything wrong with my analyzing? Any suggestions? Thanks a lot.

Dear all,

Any other comments? Thanks.

Best Regards,
Carp


在 2010年6月18日 上午11:39,Amogh Vasekar am...@yahoo-inc.com写道:


 Since the scale of input data and operations of each reduce task is the
 same, what may cause the execution time of reduce tasks different?

 You should consider looking at the copy, shuffle and reduce times
 separately from JT UI to get better info. Many (dynamic) considerations like
 network congestion, number of mappers reducer is fetching from, data skew
 wrt input keys to reducer etc will affect this number.

 HTH,
 Amogh

 On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote:

 Hi Todd and Jeff,

 Thanks a lot for your discussion, it's really helpful to me. I'd like to
 express my especial appreciation for Todd's patient explanation, you help
 me
 see more clearly about the working mechanism of SORT. And Jeff, really
 thank
 you for reminding me that sort uses TotalOrderPartitioner to do
 partitioning.
 Based on your discussion I update my understanding as follows:
 The sorting happens on the map side during the spill process of each map
 task, after that, the overall map outputs are partitioned by method of
 TotalOrderPartitioner, this decides the input range of each reducer.
 Reducers get map outputs as decided by the partitioner, and do merging and
 write results into HDFS.
 Is this understanding right? Please correct me if you find any faults,
 thanks.
 If this understanding is right, then my question rolls back to the original
 one: Since the scale of input data and operations of each reduce task is
 the
 same, what may cause the execution time of reduce tasks different? All
 nodes
 used in my experiment are on the same rack, and they are homogenous.
 Any suggesion will be highly appreciated, thanks.

 Best Regards,
 Carp

 2010/6/18 Todd Lipcon t...@cloudera.com

  On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote:
 
   Todd,
  
   Why's there a sorting in map task, the sorting here seems useless in my
   opinion.
  
  
  For map-only jobs there isn't. For jobs with reduce, typically the number
  of
  reduce tasks is smaller than the number of map tasks, so parallelizing
 the
  sort on the mappers and just doing merge on the reducers is beneficial.
  Second, this allows the combiner to run on the mapper by identifying when
  it
  has multiple outputs for the same key. Third, this allows improved
  compression on the map output (thus less intermediate data transfer) by
  putting similar keys near each other (hopefully within the compression
  window). Fourth, it kills two birds with one stone since the mappers
  already
  have to group outputs by the partition.
 
  -Todd
 
 
  
  
   On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com
 wrote:
On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com
 wrote:
   
Your understanding of Sort is not right. The key concept of Sort is
the TotalOrderPartitioner. Actually before the map-reduce job,
 client
side will do sampling of input data to estimate the distribution of
input data. And the mapper do nothing, each reducer will fetch its
data according the TotalOrderPartitioner. The data in each reducer
 is
local sorted, and each reducer are sorted ( r0r1r2), so the
overall result data is sorted.
   
   
The sorting happens on the map side, actually, during the spill
  process.
   The
mapper itself is an identity function, but the map task code does
  perform
   a
sort (on a partition,key tuple) as originally described in this
  thread.
Reducers just do a merge of mapper outputs.
   
-Todd
   
   
   
   
   
On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote:
 Hi all,

 I'm 

Re: Performance tuning of sort

2010-06-17 Thread Jeff Zhang
The scale of each reducer depends on the Partitioner. You can think of
Partitioner as a Hash Function, and the reducer as bucket, So you can
not expect that each bucket has same number of items.

Skewed data distribution will make a few reducers cost much more time.



2010/6/18 李钰 car...@gmail.com:
 Hi Jeff and Amogh,

 Thanks for your comments! In my understanding, in the partitioning phase
 before spilling to disk, the threads will divide the data into partitions
 corresponding to the number of reducers, as described int the Definitive
 Guide. So I think the scale of input data should be the same for each
 reducer. I wonder if I have any misunderstanding about this, please correct
 me if you find any faults, thanks.

 As to the reduce phases, I did check the time of shuffle, sort and reduce
 through the JT UI, but found it quite different for each reduce task. Some
 task may have longer shuffle time but less reduce time, while some may have
 less shuffle time but longer reduce time. I set the reducer number large
 enough to let all reduce tasks run in parallel, and set
 mapred.reduce.slowstart.completed.maps parameter to 1.0 to let them start
 at the same time when all map tasks have been finished, and I think this may
 reduce the impact of network and time cost of waiting for map task to finish
 during the shuffle phase. Then why still got quite different time spent in
 shuffle? And since the reduce phase of reduce is just writing sorted data
 into HDFS, why the time of reduce phase is different?

 Anything wrong with my analyzing? Any suggestions? Thanks a lot.

 Dear all,

 Any other comments? Thanks.

 Best Regards,
 Carp


 在 2010年6月18日 上午11:39,Amogh Vasekar am...@yahoo-inc.com写道:


 Since the scale of input data and operations of each reduce task is the
 same, what may cause the execution time of reduce tasks different?

 You should consider looking at the copy, shuffle and reduce times
 separately from JT UI to get better info. Many (dynamic) considerations like
 network congestion, number of mappers reducer is fetching from, data skew
 wrt input keys to reducer etc will affect this number.

 HTH,
 Amogh

 On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote:

 Hi Todd and Jeff,

 Thanks a lot for your discussion, it's really helpful to me. I'd like to
 express my especial appreciation for Todd's patient explanation, you help
 me
 see more clearly about the working mechanism of SORT. And Jeff, really
 thank
 you for reminding me that sort uses TotalOrderPartitioner to do
 partitioning.
 Based on your discussion I update my understanding as follows:
 The sorting happens on the map side during the spill process of each map
 task, after that, the overall map outputs are partitioned by method of
 TotalOrderPartitioner, this decides the input range of each reducer.
 Reducers get map outputs as decided by the partitioner, and do merging and
 write results into HDFS.
 Is this understanding right? Please correct me if you find any faults,
 thanks.
 If this understanding is right, then my question rolls back to the original
 one: Since the scale of input data and operations of each reduce task is
 the
 same, what may cause the execution time of reduce tasks different? All
 nodes
 used in my experiment are on the same rack, and they are homogenous.
 Any suggesion will be highly appreciated, thanks.

 Best Regards,
 Carp

 2010/6/18 Todd Lipcon t...@cloudera.com

  On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote:
 
   Todd,
  
   Why's there a sorting in map task, the sorting here seems useless in my
   opinion.
  
  
  For map-only jobs there isn't. For jobs with reduce, typically the number
  of
  reduce tasks is smaller than the number of map tasks, so parallelizing
 the
  sort on the mappers and just doing merge on the reducers is beneficial.
  Second, this allows the combiner to run on the mapper by identifying when
  it
  has multiple outputs for the same key. Third, this allows improved
  compression on the map output (thus less intermediate data transfer) by
  putting similar keys near each other (hopefully within the compression
  window). Fourth, it kills two birds with one stone since the mappers
  already
  have to group outputs by the partition.
 
  -Todd
 
 
  
  
   On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com
 wrote:
On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com
 wrote:
   
Your understanding of Sort is not right. The key concept of Sort is
the TotalOrderPartitioner. Actually before the map-reduce job,
 client
side will do sampling of input data to estimate the distribution of
input data. And the mapper do nothing, each reducer will fetch its
data according the TotalOrderPartitioner. The data in each reducer
 is
local sorted, and each reducer are sorted ( r0r1r2), so the
overall result data is sorted.
   
   
The sorting happens on the map side, actually, during the spill
  process.
   The
mapper