[ https://issues.apache.org/jira/browse/PIG-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Adam Szita updated PIG-5054: ---------------------------- Attachment: piglog2.txt > Initialize SchemaTupleBackend correctly in backend in spark mode if spark > job has more than 1 stage > ---------------------------------------------------------------------------------------------------- > > Key: PIG-5054 > URL: https://issues.apache.org/jira/browse/PIG-5054 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: Adam Szita > Fix For: spark-branch > > Attachments: piglog, piglog2.txt, script.pig > > > After PIG-4970, we remove the serialization and deserialization of jobConf in > spark mode. But in script of pigmix L5.pig > {code} > register pigperf.jar; > A = load '/user/pig/tests/data/pigmix/page_views' using > org.apache.pig.test.udf.storefunc.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = foreach A generate user; > alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') > as (name, phone, address, > city, state, zip); > beta = foreach alpha generate name; > C = cogroup beta by name, B by user parallel 40; > D = filter C by COUNT(beta) == 0; > E = foreach D generate group; > store E into 'L5out'; > {code} > following error is thrown out in log > {noformat} > java.lang.RuntimeException: > org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while > executing ForEach at [C[-1,-1]] > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:89) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:57) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: > Error while executing ForEach at [C[-1,-1]] > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:329) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter$ForEachFunction$1$1.getNextResult(ForEachConverter.java:87) > at > org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:69) > ... 20 more > Caused by: java.lang.RuntimeException: initialize was not called! Even when > SchemaTuple feature is not set, it should be called. > at > org.apache.pig.data.SchemaTupleBackend.newSchemaTupleFactory(SchemaTupleBackend.java:294) > at > org.apache.pig.data.SchemaTupleFactory.getInstance(SchemaTupleFactory.java:119) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:350) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:321) > ... 22 more > {noformat} > It seems that SchemaTupleBackend is not correctly initialized. The reason > for this error is because after PIG-4970, we initialized SchemaTupleBackend > in > [PigInputFormatSpark#initialize|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L68] > before we load data(stage-0). But it is not initialized in other stage(such > as stage1). So if there are more than 1 stage, the exception will be thrown > out. -- This message was sent by Atlassian JIRA (v6.3.4#6332)