[
https://issues.apache.org/jira/browse/PIG-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551055#comment-15551055
]
Bert Passek commented on PIG-4969:
----------------------------------
Hi,
something got broken with this patch as i can not run quite a simple pig script
on spark.
{code:title=test.pig|borderStyle=solid}
b = GROUP a BY id;
c = FOREACH b GENERATE
group as id,
COUNT(a);
{code}
I got an IndexOutOfBoundsException:
{code:borderStyle=solid}
java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at org.apache.pig.data.DefaultTuple.get(DefaultTuple.java:117)
at
org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter$MergeValuesFunction.apply(ReduceByConverter.java:185)
at
org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter$MergeValuesFunction.apply(ReduceByConverter.java:163)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{code}
I was trying to locate the error, MergeValuesFunction used by ReduceByConverter
should generate Tuples with 2 elements and this is not the case. Maybe the
newly introduced
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeFunction
should be used in MergeValuesFunction.
Regards
> Optimize combine case for spark mode
> ------------------------------------
>
> Key: PIG-4969
> URL: https://issues.apache.org/jira/browse/PIG-4969
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4969_2.patch, PIG-4969_3.patch
>
>
> In our test result of 1 TB pigmix benchmark , it shows that it runs slower in
> combine case in spark mode .
> ||Script||MR||Spark
> |L_1|8089 |10064
> L1.pig
> {code}
> register pigperf.jar;
> A = load '/user/pig/tests/data/pigmix/page_views' using
> org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
> as (user, action, timespent, query_term, ip_addr, timestamp,
> estimated_revenue, page_info, page_links);
> B = foreach A generate user, (int)action as action, (map[])page_info as
> page_info,
> flatten((bag{tuple(map[])})page_links) as page_links;
> C = foreach B generate user,
> (action == 1 ? page_info#'a' : page_links#'b') as header;
> D = group C by user parallel 40;
> E = foreach D generate group, COUNT(C) as cnt;
> store E into 'L1out';
> {code}
> Then spark plan
> {code}
> exec] #--------------------------------------------------
> [exec] # Spark Plan
> [exec] #--------------------------------------------------
> [exec]
> [exec] Spark node scope-38
> [exec] E:
> Store(hdfs://bdpe81:8020/user/root/output/pig/L1out:org.apache.pig.builtin.PigStorage)
> - scope-37
> [exec] |
> [exec] |---E: New For Each(false,false)[tuple] - scope-42
> [exec] | |
> [exec] | Project[bytearray][0] - scope-39
> [exec] | |
> [exec] | Project[bag][1] - scope-40
> [exec] |
> [exec] | POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] -
> scope-41
> [exec] | |
> [exec] | |---Project[bag][1] - scope-57
> [exec] |
> [exec] |---Reduce By(false,false)[tuple] - scope-47
> [exec] | |
> [exec] | Project[bytearray][0] - scope-48
> [exec] | |
> [exec] |
> POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-49
> [exec] | |
> [exec] | |---Project[bag][1] - scope-50
> [exec] |
> [exec] |---D: Local Rearrange[tuple]{bytearray}(false) - scope-53
> [exec] | |
> [exec] | Project[bytearray][0] - scope-55
> [exec] |
> [exec] |---E: New For Each(false,false)[bag] - scope-43
> [exec] | |
> [exec] | Project[bytearray][0] - scope-44
> [exec] | |
> [exec] |
> POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-45
> [exec] | |
> [exec] | |---Project[bag][1] - scope-46
> [exec] |
> [exec] |---Pre Combiner Local Rearrange[tuple]{Unknown}
> - scope-56
> [exec] |
> [exec] |---C: New For Each(false,false)[bag] -
> scope-26
> [exec] | |
> [exec] | Project[bytearray][0] - scope-13
> [exec] | |
> [exec] | POBinCond[bytearray] - scope-22
> [exec] | |
> [exec] | |---Equal To[boolean] - scope-17
> [exec] | | |
> [exec] | | |---Project[int][1] - scope-15
> [exec] | | |
> [exec] | | |---Constant(1) - scope-16
> [exec] | |
> [exec] | |---POMapLookUp[bytearray] - scope-19
> [exec] | | |
> [exec] | | |---Project[map][2] - scope-18
> [exec] | |
> [exec] | |---POMapLookUp[bytearray] - scope-21
> [exec] | |
> [exec] | |---Project[map][3] - scope-20
> [exec] |
> [exec] |---B: New For
> Each(false,false,false,true)[bag] - scope-12
> [exec] | |
> [exec] | Project[bytearray][0] - scope-1
> [exec] | |
> [exec] | Cast[int] - scope-4
> [exec] | |
> [exec] | |---Project[bytearray][1] -
> scope-3
> [exec] | |
> [exec] | Cast[map:[]] - scope-7
> [exec] | |
> [exec] | |---Project[bytearray][2] -
> scope-6
> [exec] | |
> [exec] | Cast[bag:{([])}] - scope-10
> [exec] | |
> [exec] | |---Project[bytearray][3] -
> scope-9
> [exec] |
> [exec] |---A:
> Load(/user/pig/tests/data/pigmix/page_views:org.apache.pig.test.pigmix.udf.PigPerformanceLoader)
> - scope-0--------
> {code}
> We can combine LocalRearrange(scope-53) and ReduceBy(scope-47) as 1 physical
> operator to remove the redundant map operations like what we did in
> PIG-4797(Optimization for join/group case for spark mode).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)