For prototyping purposes, I created a test program injecting dependancies using Spring.
Nothing fancy. This is just a re-write of KafkaDirectWordCount. When I run this, I get the following exception: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157) at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43) at com.olacabs.spark.examples.WordCountProcessorKafkaImpl.process(WordCountProcessorKafkaImpl.java:45) at com.olacabs.spark.examples.WordCountApp.main(WordCountApp.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects. Serialization stack: - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@7add323c) - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl, name: streamingContext, type: class org.apache.spark.streaming.api.java.JavaStreamingContext) - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl, com.olacabs.spark.examples.WordCountProcessorKafkaImpl@29a1505c) - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, name: this$0, type: class com.olacabs.spark.examples.WordCountProcessorKafkaImpl) - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1@c6c82aa) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 23 more Can someone help me figure out why? Here is the Code : public interface EventProcessor extends Serializable { void process(); } public class WordCountProcessorKafkaImpl implements EventProcessor { private static final Pattern SPACE = Pattern.compile(" "); @Autowired @Qualifier("streamingContext") JavaStreamingContext streamingContext; @Autowired @Qualifier("inputDStream") JavaPairInputDStream<String, String> inputDStream; @Override public void process() { // Get the lines, split them into words, count the words and print JavaDStream<String> lines = inputDStream.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); // Start the computation streamingContext.start(); streamingContext.awaitTermination(); } }