Hi All, Please help me.
I have 10 (tables data) parquet file in s3. I am reading and storing as Dataset<Row> then registered as temp table. One table driving whole flow so i am doing below.(When i am triggering query from Code Base: SparkSession spark = SparkSession.builder().appName("Test").getOrCreate(); Dataset<Row> citationDF = spark.read().parquet("s3://...") ... ... citationDF.createOrReplaceTempView("citation"); ... .... cit_num.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(Iterator<Row> iter) { while (iter.hasNext()) { Row record=iter.next(); int citation_num=record.getInt(0); String ci_query="select queries ....";//(i can execute this query outside of foreach) System.out.println("citation num:"+citation_num+" count:"+spark .sql(ci_query).count()); accum.add(1); System.out.println("accumulator count:"+accum); } } }); Error: 16/10/24 09:08:12 WARN TaskSetManager: Lost task 1.0 in stage 30.0 (TID 83, ip-10-95-36-172.dev): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/10/24 09:08:12 INFO YarnScheduler: Stage 30 was cancelled 16/10/24 09:08:12 INFO DAGScheduler: ResultStage 30 (foreachPartition at CitationTest.java:108) failed in 0.421 s 16/10/24 09:08:12 INFO DAGScheduler: Job 23 failed: foreachPartition at CitationTest.java:108, took 0.578050 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 30.0 failed 4 times, most recent failure: Lost task 6.3 in stage 30.0 (TID 99, ip-dev): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218) at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) at com.elsevier.datasearch.CitationTest.main(CitationTest.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"