[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions

2010-09-02 Thread Ning Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12905700#action_12905700
 ] 

Ning Zhang commented on HIVE-1467:
--

As discussed with Joydeep and Ashish, it seems we should use the "distribute 
by" mechanism rather than "cluster by" to avoid sorting at the reducer side. 
The difference between them is "distribute by" only have MapReduce partition 
columns set to be the Dyanmic partition columns, and "cluster by" will 
additionally set "key columns" as the dynamic partition columns as well.

So I think we can use 2 mode of reducer-side DP with tradeoffs:
  -- distribute by mode: no sorting but reducers have to keep all files open 
during DP insert. Good choice when there are large amount of data passed from 
mappers to reducers.
  -- cluster by mode: sorting by the DP columns, but we can close a DP file 
once FileSinkOperator sees a dfferent DP column value. Good choice when total 
data size is not that large but there are large number of DPs generated.  

> dynamic partitioning should cluster by partitions
> -
>
> Key: HIVE-1467
> URL: https://issues.apache.org/jira/browse/HIVE-1467
> Project: Hadoop Hive
>  Issue Type: Improvement
>Reporter: Joydeep Sen Sarma
>Assignee: Namit Jain
>
> (based on internal discussion with Ning). Dynamic partitioning should offer a 
> mode where it clusters data by partition before writing out to each 
> partition. This will reduce number of files. Details:
> 1. always use reducer stage
> 2. mapper sends to reducer based on partitioning column. ie. reducer = 
> f(partition-cols)
> 3. f() can be made somewhat smart to:
>a. spread large partitions across multiple reducers - each mapper can 
> maintain row count seen per partition - and then apply (whenever it sees a 
> new row for a partition): 
>* reducer = (row count / 64k) % numReducers 
>Small partitions always go to one reducer. the larger the partition, 
> the more the reducers. this prevents one reducer becoming bottleneck writing 
> out one partition
>b. this still leaves the issue of very large number of splits. (64K rows 
> from 10K mappers is pretty large). for this one can apply one slight 
> modification:
>* reducer = (mapper-id/1024 + row-count/64k) % numReducers
>ie. - the first 1000 mappers always send the first 64K rows for one 
> partition to the same reducer. the next 1000 send it to the next one. and so 
> on.
> the constants 1024 and 64k are used just as an example. i don't know what the 
> right numbers are. it's also clear that this is a case where we need hadoop 
> to do only partitioning (and no sorting). this will be a useful feature to 
> have in hadoop. that will reduce the overhead due to reducers.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.



[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions

2010-08-27 Thread Ning Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12903565#action_12903565
 ] 

Ning Zhang commented on HIVE-1467:
--

@Joydeep, I filed HIVE-1602 for the list partitioning. This is particularly 
tackling the problem of skew in partitions. 

> dynamic partitioning should cluster by partitions
> -
>
> Key: HIVE-1467
> URL: https://issues.apache.org/jira/browse/HIVE-1467
> Project: Hadoop Hive
>  Issue Type: Improvement
>Reporter: Joydeep Sen Sarma
>Assignee: Namit Jain
>
> (based on internal discussion with Ning). Dynamic partitioning should offer a 
> mode where it clusters data by partition before writing out to each 
> partition. This will reduce number of files. Details:
> 1. always use reducer stage
> 2. mapper sends to reducer based on partitioning column. ie. reducer = 
> f(partition-cols)
> 3. f() can be made somewhat smart to:
>a. spread large partitions across multiple reducers - each mapper can 
> maintain row count seen per partition - and then apply (whenever it sees a 
> new row for a partition): 
>* reducer = (row count / 64k) % numReducers 
>Small partitions always go to one reducer. the larger the partition, 
> the more the reducers. this prevents one reducer becoming bottleneck writing 
> out one partition
>b. this still leaves the issue of very large number of splits. (64K rows 
> from 10K mappers is pretty large). for this one can apply one slight 
> modification:
>* reducer = (mapper-id/1024 + row-count/64k) % numReducers
>ie. - the first 1000 mappers always send the first 64K rows for one 
> partition to the same reducer. the next 1000 send it to the next one. and so 
> on.
> the constants 1024 and 64k are used just as an example. i don't know what the 
> right numbers are. it's also clear that this is a case where we need hadoop 
> to do only partitioning (and no sorting). this will be a useful feature to 
> have in hadoop. that will reduce the overhead due to reducers.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.



[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions

2010-08-27 Thread Joydeep Sen Sarma (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12903562#action_12903562
 ] 

Joydeep Sen Sarma commented on HIVE-1467:
-

@Ning - what about skew?

> dynamic partitioning should cluster by partitions
> -
>
> Key: HIVE-1467
> URL: https://issues.apache.org/jira/browse/HIVE-1467
> Project: Hadoop Hive
>  Issue Type: Improvement
>Reporter: Joydeep Sen Sarma
>Assignee: Namit Jain
>
> (based on internal discussion with Ning). Dynamic partitioning should offer a 
> mode where it clusters data by partition before writing out to each 
> partition. This will reduce number of files. Details:
> 1. always use reducer stage
> 2. mapper sends to reducer based on partitioning column. ie. reducer = 
> f(partition-cols)
> 3. f() can be made somewhat smart to:
>a. spread large partitions across multiple reducers - each mapper can 
> maintain row count seen per partition - and then apply (whenever it sees a 
> new row for a partition): 
>* reducer = (row count / 64k) % numReducers 
>Small partitions always go to one reducer. the larger the partition, 
> the more the reducers. this prevents one reducer becoming bottleneck writing 
> out one partition
>b. this still leaves the issue of very large number of splits. (64K rows 
> from 10K mappers is pretty large). for this one can apply one slight 
> modification:
>* reducer = (mapper-id/1024 + row-count/64k) % numReducers
>ie. - the first 1000 mappers always send the first 64K rows for one 
> partition to the same reducer. the next 1000 send it to the next one. and so 
> on.
> the constants 1024 and 64k are used just as an example. i don't know what the 
> right numbers are. it's also clear that this is a case where we need hadoop 
> to do only partitioning (and no sorting). this will be a useful feature to 
> have in hadoop. that will reduce the overhead due to reducers.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.



[jira] Commented: (HIVE-1467) dynamic partitioning should cluster by partitions

2010-08-27 Thread Ning Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/HIVE-1467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12903533#action_12903533
 ] 

Ning Zhang commented on HIVE-1467:
--

As discussed with Ashish offline. It seems appropriate to suport list 
partitioning now if we can sort the partition column and distribute the rows to 
the reducer to write. Will open a new JIRA and make comments on there. 

> dynamic partitioning should cluster by partitions
> -
>
> Key: HIVE-1467
> URL: https://issues.apache.org/jira/browse/HIVE-1467
> Project: Hadoop Hive
>  Issue Type: Improvement
>Reporter: Joydeep Sen Sarma
>Assignee: Namit Jain
>
> (based on internal discussion with Ning). Dynamic partitioning should offer a 
> mode where it clusters data by partition before writing out to each 
> partition. This will reduce number of files. Details:
> 1. always use reducer stage
> 2. mapper sends to reducer based on partitioning column. ie. reducer = 
> f(partition-cols)
> 3. f() can be made somewhat smart to:
>a. spread large partitions across multiple reducers - each mapper can 
> maintain row count seen per partition - and then apply (whenever it sees a 
> new row for a partition): 
>* reducer = (row count / 64k) % numReducers 
>Small partitions always go to one reducer. the larger the partition, 
> the more the reducers. this prevents one reducer becoming bottleneck writing 
> out one partition
>b. this still leaves the issue of very large number of splits. (64K rows 
> from 10K mappers is pretty large). for this one can apply one slight 
> modification:
>* reducer = (mapper-id/1024 + row-count/64k) % numReducers
>ie. - the first 1000 mappers always send the first 64K rows for one 
> partition to the same reducer. the next 1000 send it to the next one. and so 
> on.
> the constants 1024 and 64k are used just as an example. i don't know what the 
> right numbers are. it's also clear that this is a case where we need hadoop 
> to do only partitioning (and no sorting). this will be a useful feature to 
> have in hadoop. that will reduce the overhead due to reducers.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.