You didn't say what isn't serializable or where the exception occurs,
but, is it the same as this issue?
https://issues.apache.org/jira/browse/SPARK-4196

On Thu, Nov 6, 2014 at 5:42 AM, Vasu C <vasuc.bigd...@gmail.com> wrote:
> Dear All,
>
> I am getting java.io.NotSerializableException  for below code. if
> jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception
>  Please help
>
> JavaStreamingContextFactory contextFactory = new
> JavaStreamingContextFactory() {
> @Override
> public JavaStreamingContext create() {
> SparkConf sparkConf = new SparkConf().set("spark.cores.max", "3");
>
> final JavaStreamingContext jssc = new JavaStreamingContext(
> sparkConf, new Duration(300));
>
> final JavaHiveContext javahiveContext = new JavaHiveContext(
> jssc.sc());
>
> javahiveContext.createParquetFile(Bean.class,
> IMPALA_TABLE_LOC, true, new Configuration())
> .registerTempTable(TEMP_TABLE_NAME);
>
> // TODO create checkpoint directory for fault tolerance
> final JavaDStream<String> textFileStream = jssc
> .textFileStream(HDFS_FILE_LOC);
>
> textFileStream
> .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
>
> @Override
> public Void call(JavaRDD<String> rdd, Time time)
> throws Exception {
> if (rdd != null) {
> if (rdd.count() > 0) {
> JavaSchemaRDD schRdd = javahiveContext
> .jsonRDD(rdd);
> schRdd.insertInto(TEMP_TABLE_NAME);
> }
> }
> return null;
> }
> });
> jssc.checkpoint(HDFS_CHECKPOINT_DIR);
> return jssc;
> }
> };
>
> // Get JavaStreamingContext from checkpoint data or create a new one
> JavaStreamingContext context = JavaStreamingContext.getOrCreate(
> HDFS_CHECKPOINT_DIR, contextFactory);
>
> context.start(); // Start the computation
> context.awaitTermination();
>
>
>
> Regards,
>    Vasu

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to