[
https://issues.apache.org/jira/browse/PIG-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rohini Palaniswamy updated PIG-3775:
------------------------------------
Description:
When implementing Pig union, we need to gather data from two or more upstream
vertexes without sorting. The vertex itself might consists of several tasks.
Same can be done for the partitioner vertex in orderby and skewed join instead
of 1-1 edge for some cases of parallelism.
TEZ-661 has been created to add custom output and input for that in Tez. It is
currently not in the Tez team priorities but it is important for us as it will
give good performance gains. We can write the custom input/output and
contribute it to Tez and make the corresponding changes in Pig.
This is a candidate project for Google summer of code 2014. More information
about the program can be found at
https://cwiki.apache.org/confluence/display/PIG/GSoc2014
was:
When implementing Pig union, we need to gather data from two or more upstream
vertexes without sorting. The vertex itself might consists of several tasks.
Same can be done for the partitioner vertex in orderby and skewed join instead
of 1-1 edge for some cases of parallelism.
TEZ-661 has been created to add custom output and input for that in Tez. It is
currently not in the Tez team priorities but it is important for us as it will
give good performance gains. We can write the custom input/output and
contribute it to Tez and make the corresponding changes in Pig. Marking this as
a candidate for GSOC 2014.
Labels: GSOC2014 (was: gsoc2014)
To give more background and details and copying information on what I exchanged
with one of the interested students.
Pig on Tez:
Tez is a DAG execution framework and is intended as the replacement
execution engine for Pig instead of the traditional Mapreduce.
You can read about Tez
(https://github.com/apache/incubator-tez/blob/master/README.md) and Pig on Tez
(https://issues.apache.org/jira/browse/PIG-3446) and watch the presentations
from HUG (Hadoop user group) Feb 2014-
http://www.youtube.com/results?search_query=pig%20on%20tez%20hug&sm=3. There
are slides availables as well on Tez
-http://qconsf.com/system/files/presentation-slides/Apache-Tez-Accelerating-Hadoop-Query-Processing.pdf.
Union:
The case of union is simple. Consider the case
{code}
A = LOAD a ..;
B = LOAD b ...;
C = UNION A, B PARALLEL 10;
D = GROUP BY ...;
{code}
In Tez loading of A will be in Vertex 1, loading of B in Vertex 2. Input of A
and B will be sent to Vertex 3 (Group by) as a composite input using Alias
Vertex (PIG-3743) using a scatter-gather edge. Scatter-Gather edge sorts and
partitions output(OnFileSortedOutput) on the predecessor vertex , shuffles and
does merge sort of the input(ShuffledMergedInput) on the successor vertex
similar to what happens between map tasks and reducer tasks in MR. But since
union does not require sorting and grouping we can go with
OnFileUnorderedPartitionedOutput and ShuffledUnorderedKVInput (TEZ-910).
OnFileUnorderedPartitionedOutput needs to be implemented for this problem and
contributed back to Tez (TEZ-661) as it will be a generic output that can be
reused by other projects like Hive.
Distributed Orderby and Skewed Join:
These are more complex and somewhat similar in concept. So will just go
into detail on one of them - Distributed Orderby. In MR there are three jobs to
do an orderby
Job 1 - Map only job that loads the hdfs data, process any statements like
filter before order by and store it in hdfs in Pig intermediate file format.
Job 2 (Sampler job) - Map-Reduce job where map tasks load the intermediate data
from Job1, outputs 100 samples per task and there is a single reducer which
aggregates the samples and creates a quantile map taking the skew of the data
into account and stores that in hdfs.
Job 3 (Orderby job) - Map-Reduce job where map tasks loads the intermediate
data from Job 1 and the quantiles map from Job 2 (using Distributed Cache) and
partitions the data using WeightedRangePartitioner which uses the quantiles map
to do range partitioning. The data produced by the reducer is sorted. It is a
distributed order by because, iff there are 10 reducers generating part files
part-r-00000 to part-r-00009, then data is totally ordered when the files are
read in order because of the range partitioning.
In Tez this is currently implemented as
Vertex 1 - Read hdfs data once, compute both the samples (100 or configurable
number of samples per task using reservoir sampling) and also the arrange the
data based on the order by key. The sample is sent to Vertex2 and the data is
sent to Vertex 3.
Vertex 2 - This has only 1 task where samples from Vertex 1 tasks are
aggregated and quantile map is constructed.
Vertex 3 - The tasks in Vertex 3 take the data from Vertex 1 using 1-1 edge.
The WeightedRangePartitioner in Vertex 3 tasks takes the quantile map from
Vertex 2 through broadcast input. Data is partitioned by the
WeightedRangePartitioner and the outputs are sent to Vertex 4. This is the
equivalent of the Map phase in Job 3 of MR
Vertex 4 - This is the equivalent of Reduce phase of Job 3 in MR.
Vertex 1 -> Vertex 2 is Scatter Gather edge.
Vertex 2 -> Vertex 3 is Broadcast edge.
Vertex 1 -> Vertex 3 is 1-1 edge.
Vertex 3 -> Vertex 4 is Scatter Gather edge.
Parallelism (no. of tasks):
Vertex 1 - Parallelism would be equal to the number of input splits in hdfs
data.
Vertex 2 - Parallelism is 1 as we need 1 task to aggregate all samples
Vertex 3 - Same as Vertex 1 as it is a 1-1 edge
Vertex 4 - Parallelism set in Order by using PARALLEL clause or the default
parallelism set for pig script.
Initially Vertex 1 -> Vertex 3 was Scatter-Gather edge with
RoundRobinPartitioner. Vertex 3 had same parallelism as Vertex 4. But the
performance was worse due to the overhead of shuffle and sort. Also
RoundRobinPartitioner was also bad for performance as if the data was already
slightly sorted it would not take advantage of that. So we moved to 1-1 edge as
it gave way better performance. But we want to move to Scatter-Gather edge with
unordered non-grouped partitioned output.
Reasons being:
- The idea of using 1-1 edge is that the tasks of 1-1 edge for Vertex 1 and
Vertex 3 will be run on the same node or jvm reused and data will be read from
local disk. But that is not the case now and 1-1 scheduling needs to be fixed
in Tez (TEZ-800) and currently is not in the priority list of Tez team.
- Another reason is if input data size is big, Vertex 1 launches a lot of
tasks because there are lot of input splits (Lets say 10K). If there was a
filter clause before order by, then vertex 1 will reduce the size of data a
lot. But because it is 1-1 edge vertex 3 launches same no. of tasks as vertex 1
(10K) which is bad. Data by now is probably reduced to 1K tasks worth. This is
a good candidate for application of feature of Automatic Reducer Parallelism
(ARP) in Tez. This is not a problem with MR as Job 1 does the filter and stores
the intermediate output to disk. Number of map tasks of Job 2 and Job 3 will
depend on the input splits of the intermediate input.
- If there are 10K tasks in Vertex 3 and Vertex 4 parallelism is set to 500,
each reducer has to merge 10K map outputs from Vertex 3 which is very bad for
performance.
To summarize:
For Distributed Orderby, we need to do Scatter-Gather edge with unordered
non-grouped partitioned output from Vertex 1 to Vertex 3 and also determine the
parallelism of Vertex 3 using "Automatic Reducer Parallelism" in Tez. Also need
to come up with a good algorithm for the Partitioner that does equal
distribution of the skewed data to Vertex 3 efficiently. A similar
implementation needs to be done for Skewed join as well.
If time permits also need to determine parallelism of Vertex 4 dynamically
when there is no PARALLEL clause or a default parallelism is set for pig
script. That is very tricky as it has to be determined during execution of
Vertex 1, as the sample job needs to know the parallelism of Vertex 4 to
construct the quantile map.
Hoping that by the time, GSOC project is started the committers working on Pig
on Tez should have the basic framework for ARP in Pig to be usable for this
case.
> Use unsorted shuffle in Union, Orderby, Skewed Join to improve performance in
> Tez
> ---------------------------------------------------------------------------------
>
> Key: PIG-3775
> URL: https://issues.apache.org/jira/browse/PIG-3775
> Project: Pig
> Issue Type: Sub-task
> Components: tez
> Reporter: Rohini Palaniswamy
> Labels: GSOC2014
> Fix For: tez-branch
>
>
> When implementing Pig union, we need to gather data from two or more upstream
> vertexes without sorting. The vertex itself might consists of several tasks.
> Same can be done for the partitioner vertex in orderby and skewed join
> instead of 1-1 edge for some cases of parallelism.
> TEZ-661 has been created to add custom output and input for that in Tez. It
> is currently not in the Tez team priorities but it is important for us as it
> will give good performance gains. We can write the custom input/output and
> contribute it to Tez and make the corresponding changes in Pig.
> This is a candidate project for Google summer of code 2014. More information
> about the program can be found at
> https://cwiki.apache.org/confluence/display/PIG/GSoc2014
--
This message was sent by Atlassian JIRA
(v6.2#6252)