[ https://issues.apache.org/jira/browse/PIG-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701856#comment-15701856 ]
Adam Szita commented on PIG-5054: --------------------------------- [~kellyzly] I can't reproduce this even in yarn mode. I checked out pig on spark to match the time of creation of this jira: {code} 2871382fb110da683b3b6b7a0ea1eeba40778809 PIG-4920: Fail to use Javascript UDF in spark yarn client mode (Liyun via Xuefu) {code} Then I ran [^script.pig] (notice I had to make some minor modifications to your script because org.apache.pig.test.*udf.storefunc*.PigPerformanceLoader() is invalid, had to use org.apache.pig.test.*pigmix.udf*.PigPerformanceLoader() from pigmix.jar ). The job always runs successfully on the already-deployed pigmix data. You can see the log here: [^piglog]. This was a 4-node cluster btw. Am I missing anything to reproduce the above exception? > Initialize SchemaTupleBackend correctly in backend in spark mode if spark > job has more than 1 stage > ---------------------------------------------------------------------------------------------------- > > Key: PIG-5054 > URL: https://issues.apache.org/jira/browse/PIG-5054 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: Adam Szita > Fix For: spark-branch > > Attachments: piglog, script.pig > > > After PIG-4970, we remove the serialization and deserialization of jobConf in > spark mode. But in script of pigmix L5.pig > {code} > register pigperf.jar; > A = load '/user/pig/tests/data/pigmix/page_views' using > org.apache.pig.test.udf.storefunc.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = foreach A generate user; > alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') > as (name, phone, address, > city, state, zip); > beta = foreach alpha generate name; > C = cogroup beta by name, B by user parallel 40; > D = filter C by COUNT(beta) == 0; > E = foreach D generate group; > store E into 'L5out'; > {code} > following error is thrown out in log > {noformat} > java.lang.RuntimeException: > org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while > executing ForEach at [C[-1,-1]] > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:89) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:57) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: > Error while executing ForEach at [C[-1,-1]] > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:329) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter$ForEachFunction$1$1.getNextResult(ForEachConverter.java:87) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:69) > ... 20 more > Caused by: java.lang.RuntimeException: initialize was not called! Even when > SchemaTuple feature is not set, it should be called. > at > org.apache.pig.data.SchemaTupleBackend.newSchemaTupleFactory(SchemaTupleBackend.java:294) > at > org.apache.pig.data.SchemaTupleFactory.getInstance(SchemaTupleFactory.java:119) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:350) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:321) > ... 22 more > {noformat} > It seems that SchemaTupleBackend is not correctly initialized. The reason > for this error is because after PIG-4970, we initialized SchemaTupleBackend > in > [PigInputFormatSpark#initialize|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L68] > before we load data(stage-0). But it is not initialized in other stage(such > as stage1). So if there are more than 1 stage, the exception will be thrown > out. -- This message was sent by Atlassian JIRA (v6.3.4#6332)