​Why i could not able to access sparksession instance within
foreachpartition(i have created sparksession instance within main fucntion).
spark.sql("select 1").count or any sql queries which return within
foreachpartition throws nullpointer exception.
please give me some idea if you have faced the problem earlier.

Thanks,
Selvam R​

On Mon, Oct 24, 2016 at 10:23 AM, Selvam Raman <sel...@gmail.com> wrote:

> Hi All,
>
> Please help me.
>
> I have 10 (tables data) parquet file in s3.
>
> I am reading and storing as Dataset<Row> then registered as temp table.
>
> One table driving whole flow so i am doing below.(When i am triggering
> query from
>
>
> Code Base:
>
> SparkSession spark = SparkSession.builder().appName("Test").getOrCreate();
>
> Dataset<Row> citationDF = spark.read().parquet("s3://...")
>
> ...
>
> ...
>
> citationDF.createOrReplaceTempView("citation");
>
> ...
>
> ....
>
> cit_num.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>()
>
> {
>
>       /**
>
> *
>
> */
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
>       public void call(Iterator<Row> iter)
>
>       {
>
>         while (iter.hasNext())
>
>         {
>
>           Row record=iter.next();
>
>           int citation_num=record.getInt(0);
>
>           String ci_query="select queries ....";//(i can execute this
> query outside of foreach)
>
>           System.out.println("citation num:"+citation_num+" count:"+spark
> .sql(ci_query).count());
>
>           accum.add(1);
>
>           System.out.println("accumulator count:"+accum);
>
>         }
>
>       }
>
> });
> ​Error:
>
> 16/10/24 09:08:12 WARN TaskSetManager: Lost task 1.0 in stage 30.0 (TID
> 83, ip-10-95-36-172.dev): java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 16/10/24 09:08:12 INFO YarnScheduler: Stage 30 was cancelled
>
> 16/10/24 09:08:12 INFO DAGScheduler: ResultStage 30 (foreachPartition at
> CitationTest.java:108) failed in 0.421 s
>
> 16/10/24 09:08:12 INFO DAGScheduler: Job 23 failed: foreachPartition at
> CitationTest.java:108, took 0.578050 s
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 6 in stage 30.0 failed 4 times, most recent
> failure: Lost task 6.3 in stage 30.0 (TID 99, ip-dev):
> java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1450)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1438)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1437)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1437)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>
> at scala.Option.foreach(Option.scala:257)
>
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:811)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1659)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1618)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1607)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:881)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881)
>
> at org.apache.spark.api.java.JavaRDDLike$class.
> foreachPartition(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.AbstractJavaRDDLike.
> foreachPartition(JavaRDDLike.scala:45)
>
> at com.elsevier.datasearch.CitationTest.main(CitationTest.java:108)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
>
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
>
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)​
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

Reply via email to