[ 
https://issues.apache.org/jira/browse/PIG-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Szita updated PIG-5054:
----------------------------
    Attachment: piglog2.txt

> Initialize SchemaTupleBackend  correctly in backend in spark mode if spark 
> job has more than 1 stage
> ----------------------------------------------------------------------------------------------------
>
>                 Key: PIG-5054
>                 URL: https://issues.apache.org/jira/browse/PIG-5054
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: Adam Szita
>             Fix For: spark-branch
>
>         Attachments: piglog, piglog2.txt, script.pig
>
>
> After PIG-4970, we remove the serialization and deserialization of jobConf in 
> spark mode. But in script of pigmix L5.pig 
> {code}
>   register pigperf.jar;
> A = load '/user/pig/tests/data/pigmix/page_views' using 
> org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = foreach A generate user;
> alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') 
> as (name, phone, address,
>         city, state, zip);
> beta = foreach alpha generate name;
> C = cogroup beta by name, B by user parallel 40;
> D = filter C by COUNT(beta) == 0;
> E = foreach D generate group;
> store E into 'L5out';
> {code}
> following error is thrown out in log
> {noformat}
> java.lang.RuntimeException: 
> org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while 
> executing ForEach at [C[-1,-1]]
>      at 
> org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:89)
>      at 
> org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96)
>      at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
>      at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>      at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>      at 
> org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:57)
>      at 
> org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96)
>      at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
>      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>      at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111)
>      at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
>      at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
>      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
>      at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
>      at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
>      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>      at org.apache.spark.scheduler.Task.run(Task.scala:89)
>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>      at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: 
> Error while executing ForEach at [C[-1,-1]]
>      at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:329)
>      at 
> org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter$ForEachFunction$1$1.getNextResult(ForEachConverter.java:87)
>      at 
> org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:69)
>      ... 20 more
> Caused by: java.lang.RuntimeException: initialize was not called! Even when 
> SchemaTuple feature is not set, it should be called.
>      at 
> org.apache.pig.data.SchemaTupleBackend.newSchemaTupleFactory(SchemaTupleBackend.java:294)
>      at 
> org.apache.pig.data.SchemaTupleFactory.getInstance(SchemaTupleFactory.java:119)
>      at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:350)
>      at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:321)
>      ... 22 more
> {noformat}
> It seems that SchemaTupleBackend is not correctly initialized.  The reason 
> for this error is because after PIG-4970, we initialized SchemaTupleBackend 
> in 
> [PigInputFormatSpark#initialize|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L68]
>  before we load data(stage-0). But it is not initialized in other stage(such 
> as stage1). So if there are more than 1 stage, the exception will be thrown 
> out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to