[ 
https://issues.apache.org/jira/browse/TEZ-3865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252182#comment-16252182
 ] 

Gopal V edited comment on TEZ-3865 at 11/14/17 9:14 PM:
--------------------------------------------------------

[~rohini]: is there some overlap between this idea and TEZ-3209 ?


was (Author: gopalv):
[~rohini]: is there some overlap between this idea and TEZ-3269 ?

> A new vertex manager to partition data for STORE
> ------------------------------------------------
>
>                 Key: TEZ-3865
>                 URL: https://issues.apache.org/jira/browse/TEZ-3865
>             Project: Apache Tez
>          Issue Type: New Feature
>            Reporter: Rohini Palaniswamy
>
> Restricting number of files in output is a very common use case. In Pig, 
> currently users add a ORDER BY, GROUP BY or DISTINCT with the required 
> parallelism before STORE to achieve it. All of the above operations create 
> unnecessary overhead in processing. It would be ideal if STORE clause 
> supported the PARALLEL statement and the partitioning of data was handled in 
> a more simple and efficient manner.
> Partitioning of the data can be achieved using a very efficient vertex 
> manager as described below. Going to call it PartitionVertexManager (PVM) for 
> now till someone proposes a better name. Will be explaining using Pig 
> examples, but the logic is same for hive as well.
> There are multiple cases to consider when storing
> 1) No partitions
>        - Data is stored into a single directory using FileOutputFormat 
> implementations
> 2) Partitions
>       - Data is stored into multiple partitions. Case of static or dynamic 
> partitioning with HCat
> 3) HBase
>     I have kind of forgotten what exactly my thoughts were on this when 
> storing to multiple regions. Will update once I remember.
> Let us consider below script with pig.exec.bytes.per.reducer (this setting is 
> usually translated to tez.shuffle-vertex-manager.desired-task-input-size with 
> ShuffleVertexManager) set to 1G.
> {code}
> A = LOAD 'data' ....;
> B = GROUP A BY $0 PARALLEL 1000;
> C = FOREACH B GENERATE group, COUNT(A.a), SUM(A.b), ..;
> D = STORE C into 'output' using SomeStoreFunc() PARALLEL 20;
> {code}
> The implementation will have 3 vertices.
> v1 - LOAD vertex
> v2 - GROUP BY vertex
> v3 - STORE vertex
> PVM will be used on v3. It is going to be similar to ShuffleVertexManager but 
> with some differences. The main difference is that the source vertex does not 
> care about the parallelism of destination vertex and the number of 
> partitioned outputs it produces does not depend on that.
> 1) Case of no partitions
>    Each task in vertex v2 will produce a single partition output (no 
> Partitioner is required). The PVM will bucket this single partition data from 
> 1000 source tasks into multiple destination tasks of v3 trying to keep 1G per 
> task but max of 20 tasks (auto parallelism).
>    
> 2) Partitions
>    Let us say the table has 2 partition keys (dt and region). Since there 
> could be any number of regions for a given date, we will use store 
> parallelism as the upper limit on the number of partitions. i.e a 
> HashPartitioner with numReduceTasks as 20 and (dt, region) as the partition 
> key. If there are only 5 regions then each task of v2 will produce 5 
> partitions (with rest 15 being empty) if there is no hash collision. If there 
> are 30 regions, then each task of v2 will produce 20 partitions.
>    
>    The PVM when it groups will try to group all Partition0 segments as much 
> as possible into one v3 task. Based on skew it could end up in more tasks. 
> i.e there is no restriction on one partition going to same reducer task. 
> Doing this will avoid having to open multiple ORC files in one task when 
> doing dynamic partitioning and will be very efficient reducing namespace 
> usage even further while keeping file sizes more uniform.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to