[
https://issues.apache.org/jira/browse/PIG-4675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048222#comment-15048222
]
Xianda Ke commented on PIG-4675:
--------------------------------
hi [~kellyzly] & [~mohitsabharwal],
I have reviewed Kelly's patch. Here is my understanding:
After multiquery optimization, the PO_Load (from tmp file) was removed. Say,
it has a successor PO_XXX. When PO_XXX is converted to RDD ( physicalToRDD()
is called ), we have to collect all of the predecessors oF PO_XXX. But one of
the PO_XXX's predecessors is missing! it was the PO_Load which was removed
after multiquery optimization.
Kelly's approach:
When removing the PO_Load (from tmp file), store the from-to relationship in a
map. When convert a physcialOpertor to RDD, we collect the predecessors of the
physcialOperator, and check if any predecessor which is removed but exists in
the map.
----------------------------
Here are my comments:
MultiQueryOptimizerSpark.java:
Line 114: POSplit split = getSplit();
=> POSplit poSplit = createSplit();
// Because there are concepts such as SparkOper spliter and PhysicalOperator
POSplit. 'poSplit' is better.
Line 118: the variable name firstNodeRoots is a bit confusing. in my opinion,
rootPoNodesOfSparkOperSplitee is better :-)
Line 123: successorOfPoLoads => successorsOfPoLoad
SparkLauncher.java
line 624: the name predecessorsOfCurrentSparkOp should be
predecessorsOfCurrentPhyOper ?
> Operators with multiple predecessors fail under multiquery optimization
> -----------------------------------------------------------------------
>
> Key: PIG-4675
> URL: https://issues.apache.org/jira/browse/PIG-4675
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Peter Lin
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4675_1.patch, name.txt, ssn.txt, test.pig
>
>
> We are testing the spark branch pig recently with mapr3 and spark 1.5. It
> turns out if we use more than 1 store command in the pig script will have
> exception from the second store command.
> SSN = load '/test/ssn.txt' using PigStorage() as (ssn:long);
> SSN_NAME = load '/test/name.txt' using PigStorage() as (ssn:long,
> name:chararray);
> X = JOIN SSN by ssn LEFT OUTER, SSN_NAME by ssn USING 'replicated';
> R1 = limit SSN_NAME 10;
> store R1 into '/tmp/test1_r1';
> store X into '/tmp/test1_x';
> Exception Details:
> 15/09/11 13:37:00 INFO storage.MemoryStore: ensureFreeSpace(114448) called
> with curMem=359237, maxMem=503379394
> 15/09/11 13:37:00 INFO storage.MemoryStore: Block broadcast_2 stored as
> values in memory (estimated size 111.8 KB, free 479.6 MB)
> 15/09/11 13:37:00 INFO storage.MemoryStore: ensureFreeSpace(32569) called
> with curMem=473685, maxMem=503379394
> 15/09/11 13:37:00 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
> as bytes in memory (estimated size 31.8 KB, free 479.6 MB)
> 15/09/11 13:37:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
> memory on 10.51.2.82:55960 (size: 31.8 KB, free: 479.9 MB)
> 15/09/11 13:37:00 INFO spark.SparkContext: Created broadcast 2 from
> newAPIHadoopRDD at LoadConverter.java:88
> 15/09/11 13:37:00 WARN util.ClosureCleaner: Expected a closure; got
> org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter$ToTupleFunction
> 15/09/11 13:37:00 INFO spark.SparkLauncher: Converting operator POForEach
> (Name: SSN: New For Each(false)[bag] - scope-17 Operator Key: scope-17)
> 15/09/11 13:37:00 INFO spark.SparkLauncher: Converting operator POFRJoin
> (Name: X: FRJoin[tuple] - scope-22 Operator Key: scope-22)
> 15/09/11 13:37:00 ERROR spark.SparkLauncher: throw exception in
> sparkOperToRDD:
> java.lang.RuntimeException: Should have greater than1 predecessors for class
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.
> Got : 1
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil.assertPredecessorSizeGreaterThan(SparkUtil.java:93)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter.convert(FRJoinConverter.java:55)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter.convert(FRJoinConverter.java:46)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:633)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:600)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.physicalToRDD(SparkLauncher.java:621)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkOperToRDD(SparkLauncher.java:552)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.sparkPlanToRDD(SparkLauncher.java:501)
> at
> org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher.launchPig(SparkLauncher.java:204)
> at
> org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:301)
> at org.apache.pig.PigServer.launchPlan(PigServer.java:1390)
> at
> org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1375)
> at org.apache.pig.PigServer.execute(PigServer.java:1364)
> at org.apache.pig.PigServer.executeBatch(PigServer.java:415)
> at org.apache.pig.PigServer.executeBatch(PigServer.java:398)
> at
> org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:171)
> at
> org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:234)
> at
> org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
> at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
> at org.apache.pig.Main.run(Main.java:624)
> at org.apache.pig.Main.main(Main.java:170)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)