Dear All, I am getting java.io.NotSerializableException for below code. if jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception Please help
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().set("spark.cores.max", "3"); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(300)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); // TODO create checkpoint directory for fault tolerance final JavaDStream<String> textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { @Override public Void call(JavaRDD<String> rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() > 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); Regards, Vasu