Thanks for pointing to the issue.

Yes I think its the same issue, below is Exception


ERROR OneForOneStrategy: TestCheckpointStreamingJson$1
java.io.NotSerializableException: TestCheckpointStreamingJson
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184)
at
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Regards,
  Vasu C

On Thu, Nov 6, 2014 at 1:14 PM, Sean Owen <so...@cloudera.com> wrote:

> You didn't say what isn't serializable or where the exception occurs,
> but, is it the same as this issue?
> https://issues.apache.org/jira/browse/SPARK-4196
>
> On Thu, Nov 6, 2014 at 5:42 AM, Vasu C <vasuc.bigd...@gmail.com> wrote:
> > Dear All,
> >
> > I am getting java.io.NotSerializableException  for below code. if
> > jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception
> >  Please help
> >
> > JavaStreamingContextFactory contextFactory = new
> > JavaStreamingContextFactory() {
> > @Override
> > public JavaStreamingContext create() {
> > SparkConf sparkConf = new SparkConf().set("spark.cores.max", "3");
> >
> > final JavaStreamingContext jssc = new JavaStreamingContext(
> > sparkConf, new Duration(300));
> >
> > final JavaHiveContext javahiveContext = new JavaHiveContext(
> > jssc.sc());
> >
> > javahiveContext.createParquetFile(Bean.class,
> > IMPALA_TABLE_LOC, true, new Configuration())
> > .registerTempTable(TEMP_TABLE_NAME);
> >
> > // TODO create checkpoint directory for fault tolerance
> > final JavaDStream<String> textFileStream = jssc
> > .textFileStream(HDFS_FILE_LOC);
> >
> > textFileStream
> > .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
> >
> > @Override
> > public Void call(JavaRDD<String> rdd, Time time)
> > throws Exception {
> > if (rdd != null) {
> > if (rdd.count() > 0) {
> > JavaSchemaRDD schRdd = javahiveContext
> > .jsonRDD(rdd);
> > schRdd.insertInto(TEMP_TABLE_NAME);
> > }
> > }
> > return null;
> > }
> > });
> > jssc.checkpoint(HDFS_CHECKPOINT_DIR);
> > return jssc;
> > }
> > };
> >
> > // Get JavaStreamingContext from checkpoint data or create a new one
> > JavaStreamingContext context = JavaStreamingContext.getOrCreate(
> > HDFS_CHECKPOINT_DIR, contextFactory);
> >
> > context.start(); // Start the computation
> > context.awaitTermination();
> >
> >
> >
> > Regards,
> >    Vasu
>

Reply via email to