Thanks for pointing to the issue. Yes I think its the same issue, below is Exception
ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 java.io.NotSerializableException: TestCheckpointStreamingJson at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Regards, Vasu C On Thu, Nov 6, 2014 at 1:14 PM, Sean Owen <so...@cloudera.com> wrote: > You didn't say what isn't serializable or where the exception occurs, > but, is it the same as this issue? > https://issues.apache.org/jira/browse/SPARK-4196 > > On Thu, Nov 6, 2014 at 5:42 AM, Vasu C <vasuc.bigd...@gmail.com> wrote: > > Dear All, > > > > I am getting java.io.NotSerializableException for below code. if > > jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception > > Please help > > > > JavaStreamingContextFactory contextFactory = new > > JavaStreamingContextFactory() { > > @Override > > public JavaStreamingContext create() { > > SparkConf sparkConf = new SparkConf().set("spark.cores.max", "3"); > > > > final JavaStreamingContext jssc = new JavaStreamingContext( > > sparkConf, new Duration(300)); > > > > final JavaHiveContext javahiveContext = new JavaHiveContext( > > jssc.sc()); > > > > javahiveContext.createParquetFile(Bean.class, > > IMPALA_TABLE_LOC, true, new Configuration()) > > .registerTempTable(TEMP_TABLE_NAME); > > > > // TODO create checkpoint directory for fault tolerance > > final JavaDStream<String> textFileStream = jssc > > .textFileStream(HDFS_FILE_LOC); > > > > textFileStream > > .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { > > > > @Override > > public Void call(JavaRDD<String> rdd, Time time) > > throws Exception { > > if (rdd != null) { > > if (rdd.count() > 0) { > > JavaSchemaRDD schRdd = javahiveContext > > .jsonRDD(rdd); > > schRdd.insertInto(TEMP_TABLE_NAME); > > } > > } > > return null; > > } > > }); > > jssc.checkpoint(HDFS_CHECKPOINT_DIR); > > return jssc; > > } > > }; > > > > // Get JavaStreamingContext from checkpoint data or create a new one > > JavaStreamingContext context = JavaStreamingContext.getOrCreate( > > HDFS_CHECKPOINT_DIR, contextFactory); > > > > context.start(); // Start the computation > > context.awaitTermination(); > > > > > > > > Regards, > > Vasu >