Dear All,

I am getting java.io.NotSerializableException  for below code. if
jssc.checkpoint(HDFS_CHECKPOINT_DIR); is blocked there is not exception
 Please help

JavaStreamingContextFactory contextFactory = new
JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().set("spark.cores.max", "3");

final JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf, new Duration(300));

final JavaHiveContext javahiveContext = new JavaHiveContext(
jssc.sc());

javahiveContext.createParquetFile(Bean.class,
IMPALA_TABLE_LOC, true, new Configuration())
.registerTempTable(TEMP_TABLE_NAME);

// TODO create checkpoint directory for fault tolerance
final JavaDStream<String> textFileStream = jssc
.textFileStream(HDFS_FILE_LOC);

textFileStream
.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {

@Override
public Void call(JavaRDD<String> rdd, Time time)
throws Exception {
if (rdd != null) {
if (rdd.count() > 0) {
JavaSchemaRDD schRdd = javahiveContext
.jsonRDD(rdd);
schRdd.insertInto(TEMP_TABLE_NAME);
}
}
return null;
}
});
jssc.checkpoint(HDFS_CHECKPOINT_DIR);
return jssc;
}
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(
HDFS_CHECKPOINT_DIR, contextFactory);

context.start(); // Start the computation
context.awaitTermination();



Regards,
   Vasu

Reply via email to