Please note: I have asked the following question in stackoverflow as well
http://stackoverflow.com/questions/41729451/adding-to-spark-streaming-dstream-rdd-the-max-line-of-each-rdd

I am trying to add to each RDD in a JavaDStream the line with the maximum
timestamp, with some modification. However, I keep getting a serialization
error despite the fact that I do not have any class which is not serialized.

Below is the code sample, which is throwing the error:

            JavaDStream<LogMessage> logMessageWithHB =
logMessageMatched.transform(new Function<JavaRDD<LogMessage>,
JavaRDD<LogMessage>>() {
            @Override
            public JavaRDD<LogMessage> call(JavaRDD<LogMessage>
logMessageJavaRDD) throws Exception {

                LogMessage max =
logMessageJavaRDD.max(LogMessageComparator);
                List<LogMessage> tempList = new ArrayList<LogMessage>();
                max.convertToHBLogMessage();
                tempList.add(max);
                JavaRDD<LogMessage> parallelize =
ssc.sparkContext().parallelize(tempList);

                JavaRDD<LogMessage> union =
logMessageJavaRDD.union(parallelize);

                return union;
            }
        });

The following is the error, but it does not really tell me the root-cause:



     java.io.NotSerializableException: DStream checkpointing has been
enabled but the DStreams with their functions are not serializable
    org.apache.spark.streaming.api.java.JavaStreamingContext
    Serialization stack:
    - object not serializable (class:
org.apache.spark.streaming.api.java.JavaStreamingContext, value:
org.apache.spark.streaming.api.java.JavaStreamingContext@4b62f99b)
    - field (class:
org.necla.ngla.loganalyzer.stateful.Type9.Type9ViolationChecker$6, name:
val$ssc, type: class
org.apache.spark.streaming.api.java.JavaStreamingContext)
    - object (class
org.necla.ngla.loganalyzer.stateful.Type9.Type9ViolationChecker$6,
org.necla.ngla.loganalyzer.stateful.Type9.Type9ViolationChecker$6@6ced9607)
    - field (class:
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1,
name: transformFunc$1, type: interface
org.apache.spark.api.java.function.Function)
    - object (class
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transform$1,
<function1>)
    - field (class:
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
name: cleanedF$2, type: interface scala.Function1)
    - object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
<function2>)
    - field (class:
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5,
name: cleanedF$3, type: interface scala.Function2)
    - object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5,
<function2>)
    - field (class: org.apache.spark.streaming.dstream.TransformedDStream,
name: transformFunc, type: interface scala.Function2)
    - object (class org.apache.spark.streaming.dstream.TransformedDStream,
org.apache.spark.streaming.dstream.TransformedDStream@1df376a4)
    - field (class: org.apache.spark.streaming.dstream.MappedDStream, name:
parent, type: class org.apache.spark.streaming.dstream.DStream)
    - object (class org.apache.spark.streaming.dstream.MappedDStream,
org.apache.spark.streaming.dstream.MappedDStream@68926b04)
    - field (class: org.apache.spark.streaming.dstream.ShuffledDStream,
name: parent, type: class org.apache.spark.streaming.dstream.DStream)
    - object (class org.apache.spark.streaming.dstream.ShuffledDStream,
org.apache.spark.streaming.dstream.ShuffledDStream@1aa732d9)
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
    - object (class org.apache.spark.streaming.dstream.MapValuedDStream,
org.apache.spark.streaming.dstream.MapValuedDStream@39579072)
    - writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)

Can anyone please suggest what I am doing wrong? Please let me know if you
need more information.

Thanks
Nipun

Reply via email to