[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions
[ https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12905700#action_12905700 ] Ning Zhang commented on HIVE-1467: -- As discussed with Joydeep and Ashish, it seems we should use the "distribute by" mechanism rather than "cluster by" to avoid sorting at the reducer side. The difference between them is "distribute by" only have MapReduce partition columns set to be the Dyanmic partition columns, and "cluster by" will additionally set "key columns" as the dynamic partition columns as well. So I think we can use 2 mode of reducer-side DP with tradeoffs: -- distribute by mode: no sorting but reducers have to keep all files open during DP insert. Good choice when there are large amount of data passed from mappers to reducers. -- cluster by mode: sorting by the DP columns, but we can close a DP file once FileSinkOperator sees a dfferent DP column value. Good choice when total data size is not that large but there are large number of DPs generated. > dynamic partitioning should cluster by partitions > - > > Key: HIVE-1467 > URL: https://issues.apache.org/jira/browse/HIVE-1467 > Project: Hadoop Hive > Issue Type: Improvement >Reporter: Joydeep Sen Sarma >Assignee: Namit Jain > > (based on internal discussion with Ning). Dynamic partitioning should offer a > mode where it clusters data by partition before writing out to each > partition. This will reduce number of files. Details: > 1. always use reducer stage > 2. mapper sends to reducer based on partitioning column. ie. reducer = > f(partition-cols) > 3. f() can be made somewhat smart to: >a. spread large partitions across multiple reducers - each mapper can > maintain row count seen per partition - and then apply (whenever it sees a > new row for a partition): >* reducer = (row count / 64k) % numReducers >Small partitions always go to one reducer. the larger the partition, > the more the reducers. this prevents one reducer becoming bottleneck writing > out one partition >b. this still leaves the issue of very large number of splits. (64K rows > from 10K mappers is pretty large). for this one can apply one slight > modification: >* reducer = (mapper-id/1024 + row-count/64k) % numReducers >ie. - the first 1000 mappers always send the first 64K rows for one > partition to the same reducer. the next 1000 send it to the next one. and so > on. > the constants 1024 and 64k are used just as an example. i don't know what the > right numbers are. it's also clear that this is a case where we need hadoop > to do only partitioning (and no sorting). this will be a useful feature to > have in hadoop. that will reduce the overhead due to reducers. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions
[ https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12903565#action_12903565 ] Ning Zhang commented on HIVE-1467: -- @Joydeep, I filed HIVE-1602 for the list partitioning. This is particularly tackling the problem of skew in partitions. > dynamic partitioning should cluster by partitions > - > > Key: HIVE-1467 > URL: https://issues.apache.org/jira/browse/HIVE-1467 > Project: Hadoop Hive > Issue Type: Improvement >Reporter: Joydeep Sen Sarma >Assignee: Namit Jain > > (based on internal discussion with Ning). Dynamic partitioning should offer a > mode where it clusters data by partition before writing out to each > partition. This will reduce number of files. Details: > 1. always use reducer stage > 2. mapper sends to reducer based on partitioning column. ie. reducer = > f(partition-cols) > 3. f() can be made somewhat smart to: >a. spread large partitions across multiple reducers - each mapper can > maintain row count seen per partition - and then apply (whenever it sees a > new row for a partition): >* reducer = (row count / 64k) % numReducers >Small partitions always go to one reducer. the larger the partition, > the more the reducers. this prevents one reducer becoming bottleneck writing > out one partition >b. this still leaves the issue of very large number of splits. (64K rows > from 10K mappers is pretty large). for this one can apply one slight > modification: >* reducer = (mapper-id/1024 + row-count/64k) % numReducers >ie. - the first 1000 mappers always send the first 64K rows for one > partition to the same reducer. the next 1000 send it to the next one. and so > on. > the constants 1024 and 64k are used just as an example. i don't know what the > right numbers are. it's also clear that this is a case where we need hadoop > to do only partitioning (and no sorting). this will be a useful feature to > have in hadoop. that will reduce the overhead due to reducers. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions
[ https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12903562#action_12903562 ] Joydeep Sen Sarma commented on HIVE-1467: - @Ning - what about skew? > dynamic partitioning should cluster by partitions > - > > Key: HIVE-1467 > URL: https://issues.apache.org/jira/browse/HIVE-1467 > Project: Hadoop Hive > Issue Type: Improvement >Reporter: Joydeep Sen Sarma >Assignee: Namit Jain > > (based on internal discussion with Ning). Dynamic partitioning should offer a > mode where it clusters data by partition before writing out to each > partition. This will reduce number of files. Details: > 1. always use reducer stage > 2. mapper sends to reducer based on partitioning column. ie. reducer = > f(partition-cols) > 3. f() can be made somewhat smart to: >a. spread large partitions across multiple reducers - each mapper can > maintain row count seen per partition - and then apply (whenever it sees a > new row for a partition): >* reducer = (row count / 64k) % numReducers >Small partitions always go to one reducer. the larger the partition, > the more the reducers. this prevents one reducer becoming bottleneck writing > out one partition >b. this still leaves the issue of very large number of splits. (64K rows > from 10K mappers is pretty large). for this one can apply one slight > modification: >* reducer = (mapper-id/1024 + row-count/64k) % numReducers >ie. - the first 1000 mappers always send the first 64K rows for one > partition to the same reducer. the next 1000 send it to the next one. and so > on. > the constants 1024 and 64k are used just as an example. i don't know what the > right numbers are. it's also clear that this is a case where we need hadoop > to do only partitioning (and no sorting). this will be a useful feature to > have in hadoop. that will reduce the overhead due to reducers. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions
[ https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12903533#action_12903533 ] Ning Zhang commented on HIVE-1467: -- As discussed with Ashish offline. It seems appropriate to suport list partitioning now if we can sort the partition column and distribute the rows to the reducer to write. Will open a new JIRA and make comments on there. > dynamic partitioning should cluster by partitions > - > > Key: HIVE-1467 > URL: https://issues.apache.org/jira/browse/HIVE-1467 > Project: Hadoop Hive > Issue Type: Improvement >Reporter: Joydeep Sen Sarma >Assignee: Namit Jain > > (based on internal discussion with Ning). Dynamic partitioning should offer a > mode where it clusters data by partition before writing out to each > partition. This will reduce number of files. Details: > 1. always use reducer stage > 2. mapper sends to reducer based on partitioning column. ie. reducer = > f(partition-cols) > 3. f() can be made somewhat smart to: >a. spread large partitions across multiple reducers - each mapper can > maintain row count seen per partition - and then apply (whenever it sees a > new row for a partition): >* reducer = (row count / 64k) % numReducers >Small partitions always go to one reducer. the larger the partition, > the more the reducers. this prevents one reducer becoming bottleneck writing > out one partition >b. this still leaves the issue of very large number of splits. (64K rows > from 10K mappers is pretty large). for this one can apply one slight > modification: >* reducer = (mapper-id/1024 + row-count/64k) % numReducers >ie. - the first 1000 mappers always send the first 64K rows for one > partition to the same reducer. the next 1000 send it to the next one. and so > on. > the constants 1024 and 64k are used just as an example. i don't know what the > right numbers are. it's also clear that this is a case where we need hadoop > to do only partitioning (and no sorting). this will be a useful feature to > have in hadoop. that will reduce the overhead due to reducers. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.