Re: Performance tuning of sort
Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang
Re: Performance tuning of sort
Hi Jeff, Really thank you for your reply. It really helps! I'll take a look at TotalOrderPartitioner carefully. BTW, what's your opinion of where the bottleneck lies in SORT, and which parameters impact the performance of SORT most? Looking forward to your reply, thanks. Dear all, Any other comments? Thanks. Best Regards, Carp 2010/6/17 Jeff Zhang zjf...@gmail.com Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang
Re: Performance tuning of sort
On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper itself is an identity function, but the map task code does perform a sort (on a partition,key tuple) as originally described in this thread. Reducers just do a merge of mapper outputs. -Todd On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera
Re: Performance tuning of sort
Todd, Why's there a sorting in map task, the sorting here seems useless in my opinion. On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote: On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper itself is an identity function, but the map task code does perform a sort (on a partition,key tuple) as originally described in this thread. Reducers just do a merge of mapper outputs. -Todd On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera -- Best Regards Jeff Zhang
Re: Performance tuning of sort
The input of each reducer is not same, it depends on the input data distribution and Partitioner. And the running time of each reducer consist of three phases: copy, sort and reducer. 2010/6/18 李钰 car...@gmail.com: Hi Todd and Jeff, Thanks a lot for your discussion, it's really helpful to me. I'd like to express my especial appreciation for Todd's patient explanation, you help me see more clearly about the working mechanism of SORT. And Jeff, really thank you for reminding me that sort uses TotalOrderPartitioner to do partitioning. Based on your discussion I update my understanding as follows: The sorting happens on the map side during the spill process of each map task, after that, the overall map outputs are partitioned by method of TotalOrderPartitioner, this decides the input range of each reducer. Reducers get map outputs as decided by the partitioner, and do merging and write results into HDFS. Is this understanding right? Please correct me if you find any faults, thanks. If this understanding is right, then my question rolls back to the original one: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? All nodes used in my experiment are on the same rack, and they are homogenous. Any suggesion will be highly appreciated, thanks. Best Regards, Carp 2010/6/18 Todd Lipcon t...@cloudera.com On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote: Todd, Why's there a sorting in map task, the sorting here seems useless in my opinion. For map-only jobs there isn't. For jobs with reduce, typically the number of reduce tasks is smaller than the number of map tasks, so parallelizing the sort on the mappers and just doing merge on the reducers is beneficial. Second, this allows the combiner to run on the mapper by identifying when it has multiple outputs for the same key. Third, this allows improved compression on the map output (thus less intermediate data transfer) by putting similar keys near each other (hopefully within the compression window). Fourth, it kills two birds with one stone since the mappers already have to group outputs by the partition. -Todd On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote: On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper itself is an identity function, but the map task code does perform a sort (on a partition,key tuple) as originally described in this thread. Reducers just do a merge of mapper outputs. -Todd On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera -- Best Regards Jeff Zhang
Re: Performance tuning of sort
Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? You should consider looking at the copy, shuffle and reduce times separately from JT UI to get better info. Many (dynamic) considerations like network congestion, number of mappers reducer is fetching from, data skew wrt input keys to reducer etc will affect this number. HTH, Amogh On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote: Hi Todd and Jeff, Thanks a lot for your discussion, it's really helpful to me. I'd like to express my especial appreciation for Todd's patient explanation, you help me see more clearly about the working mechanism of SORT. And Jeff, really thank you for reminding me that sort uses TotalOrderPartitioner to do partitioning. Based on your discussion I update my understanding as follows: The sorting happens on the map side during the spill process of each map task, after that, the overall map outputs are partitioned by method of TotalOrderPartitioner, this decides the input range of each reducer. Reducers get map outputs as decided by the partitioner, and do merging and write results into HDFS. Is this understanding right? Please correct me if you find any faults, thanks. If this understanding is right, then my question rolls back to the original one: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? All nodes used in my experiment are on the same rack, and they are homogenous. Any suggesion will be highly appreciated, thanks. Best Regards, Carp 2010/6/18 Todd Lipcon t...@cloudera.com On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote: Todd, Why's there a sorting in map task, the sorting here seems useless in my opinion. For map-only jobs there isn't. For jobs with reduce, typically the number of reduce tasks is smaller than the number of map tasks, so parallelizing the sort on the mappers and just doing merge on the reducers is beneficial. Second, this allows the combiner to run on the mapper by identifying when it has multiple outputs for the same key. Third, this allows improved compression on the map output (thus less intermediate data transfer) by putting similar keys near each other (hopefully within the compression window). Fourth, it kills two birds with one stone since the mappers already have to group outputs by the partition. -Todd On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote: On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper itself is an identity function, but the map task code does perform a sort (on a partition,key tuple) as originally described in this thread. Reducers just do a merge of mapper outputs. -Todd On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm doing some tuning of the sort benchmark of hadoop. To be more specified, running test against the org.apache.hadoop.examples.Sort class. As looking through the source code, I think the map tasks take responsibility of sorting the input data, and the reduce tasks just merge the map outputs and write them into HDFS. But here I've got a question I couldn't understand: the time cost of the reduce phase of each reduce task, that is writing data into HDFS, is different from each other. Since the input data and operations of each reduce task is the same, what reason will cause the execution time different? Is there anything wrong of my understanding? Does anybody have any experience on this? Badly need your help, thanks. Best Regards, Carp -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera -- Best Regards Jeff Zhang -- Todd Lipcon Software Engineer, Cloudera
Re: Performance tuning of sort
Hi Jeff and Amogh, Thanks for your comments! In my understanding, in the partitioning phase before spilling to disk, the threads will divide the data into partitions corresponding to the number of reducers, as described int the Definitive Guide. So I think the scale of input data should be the same for each reducer. I wonder if I have any misunderstanding about this, please correct me if you find any faults, thanks. As to the reduce phases, I did check the time of shuffle, sort and reduce through the JT UI, but found it quite different for each reduce task. Some task may have longer shuffle time but less reduce time, while some may have less shuffle time but longer reduce time. I set the reducer number large enough to let all reduce tasks run in parallel, and set mapred.reduce.slowstart.completed.maps parameter to 1.0 to let them start at the same time when all map tasks have been finished, and I think this may reduce the impact of network and time cost of waiting for map task to finish during the shuffle phase. Then why still got quite different time spent in shuffle? And since the reduce phase of reduce is just writing sorted data into HDFS, why the time of reduce phase is different? Anything wrong with my analyzing? Any suggestions? Thanks a lot. Dear all, Any other comments? Thanks. Best Regards, Carp 在 2010年6月18日 上午11:39,Amogh Vasekar am...@yahoo-inc.com写道: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? You should consider looking at the copy, shuffle and reduce times separately from JT UI to get better info. Many (dynamic) considerations like network congestion, number of mappers reducer is fetching from, data skew wrt input keys to reducer etc will affect this number. HTH, Amogh On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote: Hi Todd and Jeff, Thanks a lot for your discussion, it's really helpful to me. I'd like to express my especial appreciation for Todd's patient explanation, you help me see more clearly about the working mechanism of SORT. And Jeff, really thank you for reminding me that sort uses TotalOrderPartitioner to do partitioning. Based on your discussion I update my understanding as follows: The sorting happens on the map side during the spill process of each map task, after that, the overall map outputs are partitioned by method of TotalOrderPartitioner, this decides the input range of each reducer. Reducers get map outputs as decided by the partitioner, and do merging and write results into HDFS. Is this understanding right? Please correct me if you find any faults, thanks. If this understanding is right, then my question rolls back to the original one: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? All nodes used in my experiment are on the same rack, and they are homogenous. Any suggesion will be highly appreciated, thanks. Best Regards, Carp 2010/6/18 Todd Lipcon t...@cloudera.com On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote: Todd, Why's there a sorting in map task, the sorting here seems useless in my opinion. For map-only jobs there isn't. For jobs with reduce, typically the number of reduce tasks is smaller than the number of map tasks, so parallelizing the sort on the mappers and just doing merge on the reducers is beneficial. Second, this allows the combiner to run on the mapper by identifying when it has multiple outputs for the same key. Third, this allows improved compression on the map output (thus less intermediate data transfer) by putting similar keys near each other (hopefully within the compression window). Fourth, it kills two birds with one stone since the mappers already have to group outputs by the partition. -Todd On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote: On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper itself is an identity function, but the map task code does perform a sort (on a partition,key tuple) as originally described in this thread. Reducers just do a merge of mapper outputs. -Todd On Thu, Jun 17, 2010 at 12:13 AM, 李钰 car...@gmail.com wrote: Hi all, I'm
Re: Performance tuning of sort
The scale of each reducer depends on the Partitioner. You can think of Partitioner as a Hash Function, and the reducer as bucket, So you can not expect that each bucket has same number of items. Skewed data distribution will make a few reducers cost much more time. 2010/6/18 李钰 car...@gmail.com: Hi Jeff and Amogh, Thanks for your comments! In my understanding, in the partitioning phase before spilling to disk, the threads will divide the data into partitions corresponding to the number of reducers, as described int the Definitive Guide. So I think the scale of input data should be the same for each reducer. I wonder if I have any misunderstanding about this, please correct me if you find any faults, thanks. As to the reduce phases, I did check the time of shuffle, sort and reduce through the JT UI, but found it quite different for each reduce task. Some task may have longer shuffle time but less reduce time, while some may have less shuffle time but longer reduce time. I set the reducer number large enough to let all reduce tasks run in parallel, and set mapred.reduce.slowstart.completed.maps parameter to 1.0 to let them start at the same time when all map tasks have been finished, and I think this may reduce the impact of network and time cost of waiting for map task to finish during the shuffle phase. Then why still got quite different time spent in shuffle? And since the reduce phase of reduce is just writing sorted data into HDFS, why the time of reduce phase is different? Anything wrong with my analyzing? Any suggestions? Thanks a lot. Dear all, Any other comments? Thanks. Best Regards, Carp 在 2010年6月18日 上午11:39,Amogh Vasekar am...@yahoo-inc.com写道: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? You should consider looking at the copy, shuffle and reduce times separately from JT UI to get better info. Many (dynamic) considerations like network congestion, number of mappers reducer is fetching from, data skew wrt input keys to reducer etc will affect this number. HTH, Amogh On 6/18/10 8:05 AM, 李钰 car...@gmail.com wrote: Hi Todd and Jeff, Thanks a lot for your discussion, it's really helpful to me. I'd like to express my especial appreciation for Todd's patient explanation, you help me see more clearly about the working mechanism of SORT. And Jeff, really thank you for reminding me that sort uses TotalOrderPartitioner to do partitioning. Based on your discussion I update my understanding as follows: The sorting happens on the map side during the spill process of each map task, after that, the overall map outputs are partitioned by method of TotalOrderPartitioner, this decides the input range of each reducer. Reducers get map outputs as decided by the partitioner, and do merging and write results into HDFS. Is this understanding right? Please correct me if you find any faults, thanks. If this understanding is right, then my question rolls back to the original one: Since the scale of input data and operations of each reduce task is the same, what may cause the execution time of reduce tasks different? All nodes used in my experiment are on the same rack, and they are homogenous. Any suggesion will be highly appreciated, thanks. Best Regards, Carp 2010/6/18 Todd Lipcon t...@cloudera.com On Thu, Jun 17, 2010 at 9:37 AM, Jeff Zhang zjf...@gmail.com wrote: Todd, Why's there a sorting in map task, the sorting here seems useless in my opinion. For map-only jobs there isn't. For jobs with reduce, typically the number of reduce tasks is smaller than the number of map tasks, so parallelizing the sort on the mappers and just doing merge on the reducers is beneficial. Second, this allows the combiner to run on the mapper by identifying when it has multiple outputs for the same key. Third, this allows improved compression on the map output (thus less intermediate data transfer) by putting similar keys near each other (hopefully within the compression window). Fourth, it kills two birds with one stone since the mappers already have to group outputs by the partition. -Todd On Thu, Jun 17, 2010 at 9:26 AM, Todd Lipcon t...@cloudera.com wrote: On Thu, Jun 17, 2010 at 12:43 AM, Jeff Zhang zjf...@gmail.com wrote: Your understanding of Sort is not right. The key concept of Sort is the TotalOrderPartitioner. Actually before the map-reduce job, client side will do sampling of input data to estimate the distribution of input data. And the mapper do nothing, each reducer will fetch its data according the TotalOrderPartitioner. The data in each reducer is local sorted, and each reducer are sorted ( r0r1r2), so the overall result data is sorted. The sorting happens on the map side, actually, during the spill process. The mapper