For prototyping purposes, I created a test program injecting dependancies using 
Spring. 

Nothing fancy. This is just a re-write of KafkaDirectWordCount. When I run 
this, I get the following exception:
Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
    at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
    at 
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
    at 
org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
    at 
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
    at 
com.olacabs.spark.examples.WordCountProcessorKafkaImpl.process(WordCountProcessorKafkaImpl.java:45)
    at com.olacabs.spark.examples.WordCountApp.main(WordCountApp.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Object of 
org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized  
possibly as a part of closure of an RDD operation. This is because  the DStream 
object is being referred to from within the closure.  Please rewrite the RDD 
operation inside this DStream to avoid this.  This has been enforced to avoid 
bloating of Spark tasks  with unnecessary objects.
Serialization stack:
    - object not serializable (class: 
org.apache.spark.streaming.api.java.JavaStreamingContext, value: 
org.apache.spark.streaming.api.java.JavaStreamingContext@7add323c)
    - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl, 
name: streamingContext, type: class 
org.apache.spark.streaming.api.java.JavaStreamingContext)
    - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl, 
com.olacabs.spark.examples.WordCountProcessorKafkaImpl@29a1505c)
    - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, 
name: this$0, type: class 
com.olacabs.spark.examples.WordCountProcessorKafkaImpl)
    - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, 
com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1@c6c82aa)
    - field (class: 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, 
type: interface org.apache.spark.api.java.function.Function)
    - object (class 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 23 more



Can someone help me figure out why?


Here is the Code :

public interface EventProcessor extends Serializable {
    void process();
}
public class WordCountProcessorKafkaImpl implements EventProcessor {

    private static final Pattern SPACE = Pattern.compile(" ");

    @Autowired
    @Qualifier("streamingContext")
    JavaStreamingContext streamingContext;

    @Autowired
    @Qualifier("inputDStream")
    JavaPairInputDStream<String, String> inputDStream;

    @Override
    public void process() {
        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = inputDStream.map(new 
Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
            @Override
            public Iterable<String> call(String x) {
                return Lists.newArrayList(SPACE.split(x));
            }
        });
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        wordCounts.print();

        // Start the computation
        streamingContext.start();
        streamingContext.awaitTermination();

    }
}

Reply via email to