HI Sean,

Below is my java code and using spark 1.1.0. Still getting the same error.
Here Bean class is serialized. Not sure where exactly is the problem.
What am I doing wrong here ?

public class StreamingJson {
public static void main(String[] args) throws Exception {
final String HDFS_FILE_LOC = args[0];
final String IMPALA_TABLE_LOC = args[1];
final String TEMP_TABLE_NAME = args[2];
final String HDFS_CHECKPOINT_DIR = args[3];

JavaStreamingContextFactory contextFactory = new
JavaStreamingContextFactory() {
public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName(
"test").set("spark.cores.max", "3");

final JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf, new Duration(500));

final JavaHiveContext javahiveContext = new JavaHiveContext(
jssc.sc());

javahiveContext.createParquetFile(Bean.class,
IMPALA_TABLE_LOC, true, new Configuration())
.registerTempTable(TEMP_TABLE_NAME);

final JavaDStream<String> textFileStream = jssc
.textFileStream(HDFS_FILE_LOC);

textFileStream
.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {

@Override
public Void call(JavaRDD<String> rdd, Time time)
throws Exception {
if (rdd != null) {
if (rdd.count() > 0) {
JavaSchemaRDD schRdd = javahiveContext
.jsonRDD(rdd);
schRdd.insertInto(TEMP_TABLE_NAME);
}
}
return null;
}
});
jssc.checkpoint(HDFS_CHECKPOINT_DIR);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(
HDFS_CHECKPOINT_DIR, contextFactory);
context.start(); // Start the computation
context.awaitTermination();
}
}



Regards,
   Vasu C

On Thu, Nov 6, 2014 at 1:33 PM, Sean Owen <so...@cloudera.com> wrote:

> No, not the same thing then. This just means you accidentally have a
> reference to the unserializable enclosing test class in your code.
> Just make sure the reference is severed.
>
> On Thu, Nov 6, 2014 at 8:00 AM, Vasu C <vasuc.bigd...@gmail.com> wrote:
> > Thanks for pointing to the issue.
> >
> > Yes I think its the same issue, below is Exception
> >
> >
> > ERROR OneForOneStrategy: TestCheckpointStreamingJson$1
> > java.io.NotSerializableException: TestCheckpointStreamingJson
>

Reply via email to