I am guessing one of the two things might work.

1. Either define the pattern SPACE inside the process()
2. Mark streamingContext field and inputStream field as transient.

The problem is that the function like PairFunction needs to be serialized
for being sent to the tasks. And whole closure of the function is
serialized, and somehow that closure is capturing the whole
WordCountProcessorKafkaImpl


On Mon, Jun 29, 2015 at 5:14 AM, Spark Enthusiast <sparkenthusi...@yahoo.in>
wrote:

> For prototyping purposes, I created a test program injecting dependancies
> using Spring.
>
> Nothing fancy. This is just a re-write of KafkaDirectWordCount. When I run
> this, I get the following exception:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>     at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
>     at
> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
>     at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
>     at
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
>     at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
>     at
> org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
>     at
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
>     at
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl.process(WordCountProcessorKafkaImpl.java:45)
>     at com.olacabs.spark.examples.WordCountApp.main(WordCountApp.java:49)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:483)
>     at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>     at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: Object of
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being
> serialized  possibly as a part of closure of an RDD operation. This is
> because  the DStream object is being referred to from within the closure.
> Please rewrite the RDD operation inside this DStream to avoid this.  This
> has been enforced to avoid bloating of Spark tasks  with unnecessary
> objects.
> Serialization stack:
>     - object not serializable (class:
> org.apache.spark.streaming.api.java.JavaStreamingContext, value:
> org.apache.spark.streaming.api.java.JavaStreamingContext@7add323c)
>     - field (class:
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl, name:
> streamingContext, type: class
> org.apache.spark.streaming.api.java.JavaStreamingContext)
>     - object (class
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl,
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl@29a1505c)
>     - field (class:
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, name: this$0,
> type: class com.olacabs.spark.examples.WordCountProcessorKafkaImpl)
>     - object (class
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1,
> com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1@c6c82aa)
>     - field (class:
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
> fun$1, type: interface org.apache.spark.api.java.function.Function)
>     - object (class
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
> <function1>)
>     at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>     ... 23 more
>
>
>
> Can someone help me figure out why?
>
>
> Here is the Code :
>
> public interface EventProcessor extends Serializable {
>     void process();
> }
>
>
> public class WordCountProcessorKafkaImpl implements EventProcessor {
>
>     private static final Pattern SPACE = Pattern.compile(" ");
>
>     @Autowired
>     @Qualifier("streamingContext")
>     JavaStreamingContext streamingContext;
>
>     @Autowired
>     @Qualifier("inputDStream")
>     JavaPairInputDStream<String, String> inputDStream;
>
>     @Override
>     public void process() {
>         // Get the lines, split them into words, count the words and print
>         JavaDStream<String> lines = inputDStream.map(new 
> Function<Tuple2<String, String>, String>() {
>             @Override
>             public String call(Tuple2<String, String> tuple2) {
>                 return tuple2._2();
>             }
>         });
>         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
> String>() {
>             @Override
>             public Iterable<String> call(String x) {
>                 return Lists.newArrayList(SPACE.split(x));
>             }
>         });
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>                 new PairFunction<String, String, Integer>() {
>                     @Override
>                     public Tuple2<String, Integer> call(String s) {
>                         return new Tuple2<String, Integer>(s, 1);
>                     }
>                 }).reduceByKey(
>                 new Function2<Integer, Integer, Integer>() {
>                     @Override
>                     public Integer call(Integer i1, Integer i2) {
>                         return i1 + i2;
>                     }
>                 });
>         wordCounts.print();
>
>         // Start the computation
>         streamingContext.start();
>         streamingContext.awaitTermination();
>
>     }
> }
>
>
>

Reply via email to