[ https://issues.apache.org/jira/browse/TEZ-3865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252182#comment-16252182 ]
Gopal V edited comment on TEZ-3865 at 11/14/17 9:14 PM: -------------------------------------------------------- [~rohini]: is there some overlap between this idea and TEZ-3209 ? was (Author: gopalv): [~rohini]: is there some overlap between this idea and TEZ-3269 ? > A new vertex manager to partition data for STORE > ------------------------------------------------ > > Key: TEZ-3865 > URL: https://issues.apache.org/jira/browse/TEZ-3865 > Project: Apache Tez > Issue Type: New Feature > Reporter: Rohini Palaniswamy > > Restricting number of files in output is a very common use case. In Pig, > currently users add a ORDER BY, GROUP BY or DISTINCT with the required > parallelism before STORE to achieve it. All of the above operations create > unnecessary overhead in processing. It would be ideal if STORE clause > supported the PARALLEL statement and the partitioning of data was handled in > a more simple and efficient manner. > Partitioning of the data can be achieved using a very efficient vertex > manager as described below. Going to call it PartitionVertexManager (PVM) for > now till someone proposes a better name. Will be explaining using Pig > examples, but the logic is same for hive as well. > There are multiple cases to consider when storing > 1) No partitions > - Data is stored into a single directory using FileOutputFormat > implementations > 2) Partitions > - Data is stored into multiple partitions. Case of static or dynamic > partitioning with HCat > 3) HBase > I have kind of forgotten what exactly my thoughts were on this when > storing to multiple regions. Will update once I remember. > Let us consider below script with pig.exec.bytes.per.reducer (this setting is > usually translated to tez.shuffle-vertex-manager.desired-task-input-size with > ShuffleVertexManager) set to 1G. > {code} > A = LOAD 'data' ....; > B = GROUP A BY $0 PARALLEL 1000; > C = FOREACH B GENERATE group, COUNT(A.a), SUM(A.b), ..; > D = STORE C into 'output' using SomeStoreFunc() PARALLEL 20; > {code} > The implementation will have 3 vertices. > v1 - LOAD vertex > v2 - GROUP BY vertex > v3 - STORE vertex > PVM will be used on v3. It is going to be similar to ShuffleVertexManager but > with some differences. The main difference is that the source vertex does not > care about the parallelism of destination vertex and the number of > partitioned outputs it produces does not depend on that. > 1) Case of no partitions > Each task in vertex v2 will produce a single partition output (no > Partitioner is required). The PVM will bucket this single partition data from > 1000 source tasks into multiple destination tasks of v3 trying to keep 1G per > task but max of 20 tasks (auto parallelism). > > 2) Partitions > Let us say the table has 2 partition keys (dt and region). Since there > could be any number of regions for a given date, we will use store > parallelism as the upper limit on the number of partitions. i.e a > HashPartitioner with numReduceTasks as 20 and (dt, region) as the partition > key. If there are only 5 regions then each task of v2 will produce 5 > partitions (with rest 15 being empty) if there is no hash collision. If there > are 30 regions, then each task of v2 will produce 20 partitions. > > The PVM when it groups will try to group all Partition0 segments as much > as possible into one v3 task. Based on skew it could end up in more tasks. > i.e there is no restriction on one partition going to same reducer task. > Doing this will avoid having to open multiple ORC files in one task when > doing dynamic partitioning and will be very efficient reducing namespace > usage even further while keeping file sizes more uniform. -- This message was sent by Atlassian JIRA (v6.4.14#64029)