Rohini Palaniswamy created TEZ-3865:
---------------------------------------
Summary: A new vertex manager to partition data for STORE
Key: TEZ-3865
URL: https://issues.apache.org/jira/browse/TEZ-3865
Project: Apache Tez
Issue Type: New Feature
Reporter: Rohini Palaniswamy
Restricting number of files in output is a very common use case. In Pig,
currently users add a ORDER BY, GROUP BY or DISTINCT with the required
parallelism before STORE to achieve it. All of the above operations create
unnecessary overhead in processing. It would be ideal if STORE clause supported
the PARALLEL statement and the partitioning of data was handled in a more
simple and efficient manner.
Partitioning of the data can be achieved using a very efficient vertex manager
as described below. Going to call it PartitionVertexManager (PVM) for now till
someone proposes a better name. Will be explaining using Pig examples, but the
logic is same for hive as well.
There are multiple cases to consider when storing
1) No partitions
- Data is stored into a single directory using FileOutputFormat
implementations
2) Partitions
- Data is stored into multiple partitions. Case of static or dynamic
partitioning with HCat
3) HBase
I have kind of forgotten what exactly my thoughts were on this when storing
to multiple regions. Will update once I remember.
Let us consider below script with pig.exec.bytes.per.reducer (this setting is
usually translated to tez.shuffle-vertex-manager.desired-task-input-size with
ShuffleVertexManager) set to 1G.
{code}
A = LOAD 'data' ....;
B = GROUP A BY $0 PARALLEL 1000;
C = FOREACH B GENERATE group, COUNT(A.a), SUM(A.b), ..;
D = STORE C into 'output' using SomeStoreFunc() PARALLEL 20;
{code}
The implementation will have 3 vertices.
v1 - LOAD vertex
v2 - GROUP BY vertex
v3 - STORE vertex
PVM will be used on v3. It is going to be similar to ShuffleVertexManager but
with some differences. The main difference is that the source vertex does not
care about the parallelism of destination vertex and the number of partitioned
outputs it produces does not depend on that.
1) Case of no partitions
Each task in vertex v2 will produce a single partition output (no
Partitioner is required). The PVM will bucket this single partition data from
1000 source tasks into multiple destination tasks of v3 trying to keep 1G per
task but max of 20 tasks (auto parallelism).
2) Partitions
Let us say the table has 2 partition keys (dt and region). Since there could
be any number of regions for a given date, we will use store parallelism as the
upper limit on the number of partitions. i.e a HashPartitioner with
numReduceTasks as 20 and (dt, region) as the partition key. If there are only 5
regions then each task of v2 will produce 5 partitions (with rest 15 being
empty) if there is no hash collision. If there are 30 regions, then each task
of v2 will produce 20 partitions.
The PVM when it groups will try to group all Partition0 segments as much as
possible into one v3 task. Based on skew it could end up in more tasks. i.e
there is no restriction on one partition going to same reducer task. Doing this
will avoid having to open multiple ORC files in one task when doing dynamic
partitioning and will be very efficient reducing namespace usage even further
while keeping file sizes more uniform.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)