HI Sean, Below is my java code and using spark 1.1.0. Still getting the same error. Here Bean class is serialized. Not sure where exactly is the problem. What am I doing wrong here ?
public class StreamingJson { public static void main(String[] args) throws Exception { final String HDFS_FILE_LOC = args[0]; final String IMPALA_TABLE_LOC = args[1]; final String TEMP_TABLE_NAME = args[2]; final String HDFS_CHECKPOINT_DIR = args[3]; JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { SparkConf sparkConf = new SparkConf().setAppName( "test").set("spark.cores.max", "3"); final JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, new Duration(500)); final JavaHiveContext javahiveContext = new JavaHiveContext( jssc.sc()); javahiveContext.createParquetFile(Bean.class, IMPALA_TABLE_LOC, true, new Configuration()) .registerTempTable(TEMP_TABLE_NAME); final JavaDStream<String> textFileStream = jssc .textFileStream(HDFS_FILE_LOC); textFileStream .foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { @Override public Void call(JavaRDD<String> rdd, Time time) throws Exception { if (rdd != null) { if (rdd.count() > 0) { JavaSchemaRDD schRdd = javahiveContext .jsonRDD(rdd); schRdd.insertInto(TEMP_TABLE_NAME); } } return null; } }); jssc.checkpoint(HDFS_CHECKPOINT_DIR); return jssc; } }; JavaStreamingContext context = JavaStreamingContext.getOrCreate( HDFS_CHECKPOINT_DIR, contextFactory); context.start(); // Start the computation context.awaitTermination(); } } Regards, Vasu C On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen <so...@cloudera.com> wrote: > No, not the same thing then. This just means you accidentally have a > reference to the unserializable enclosing test class in your code. > Just make sure the reference is severed. > > On Thu, Nov 6, 2014 at 8:00 AM, Vasu C <vasuc.bigd...@gmail.com> wrote: > > Thanks for pointing to the issue. > > > > Yes I think its the same issue, below is Exception > > > > > > ERROR OneForOneStrategy: TestCheckpointStreamingJson$1 > > java.io.NotSerializableException: TestCheckpointStreamingJson >