The problem is not ReduceWords, since it is already Serializable by
implementing Function2. Indeed the error tells you just what is
unserializable: KafkaStreamingWordCount, your driver class.

Something is causing a reference to the containing class to be
serialized in the closure. The best fix is to not do this. Usually the
culprit is an inner class, possibly anonymous, that is non-static.
These contain a hidden reference to the containing class, through
which you may be referring to one of its members. If not, it's still
possible the closure cleaner isn't removing the reference even though
it could.

Is ReduceWords actually an inner class?

Or on another tangent, when you remove reduceByKey, you are also
removing print? that would cause it to do nothing, which of course
generates no error.

On Wed, Oct 15, 2014 at 12:11 AM, Abraham Jacob <abe.jac...@gmail.com> wrote:
>
> Hi All,
>
> I am trying to understand what is going on in my simple WordCount Spark
> Streaming application. Here is the setup -
>
> I have a Kafka producer that is streaming words (lines of text). On the flip
> side, I have a spark streaming application that uses the high-level
> Kafka/Spark connector to read in these messages from the kafka topic. The
> code is straightforward  -
> Using CDH5.1.3 distribution and submitting the job to a yarn cluster
>
>
> SparkConf sparkConf = new
> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
> sparkConf.set("spark.shuffle.manager", "SORT");
> sparkConf.set("spark.streaming.unpersist", "true");
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(1000));
> Map<String, String> kafkaConf = new HashMap<String, String>();
> kafkaConf.put("zookeeper.connect", zookeeper);
> kafkaConf.put("group.id", consumerGrp);
> kafkaConf.put("auto.offset.reset", "largest");
> kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
> kafkaConf.put("rebalance.max.retries", "20");
> kafkaConf.put("rebalance.backoff.ms", "30000");
> Map<String, Integer> topicMap = new HashMap<String, Integer>();
> topicMap.put(topic, 1);
> List<JavaPairDStream<byte[], String>> kafkaStreams = new
> ArrayList<JavaPairDStream<byte[], String>>();
> for(int i = 0; i < numPartitions; i++) {
> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
> DefaultDecoder.class, PayloadDeSerializer.class,
> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
> PairFunction<Tuple2<byte[],String>, byte[], String>() {
>
> private static final long serialVersionUID = -1936810126415608167L;
>
> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
> Exception {
> return tuple2;
> }
> }
> )
> );
> }
>
> JavaPairDStream<byte[], String> unifiedStream;
> if (kafkaStreams.size() > 1) {
> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
> } else {
> unifiedStream = kafkaStreams.get(0);
> }
> JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
> JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords());
> wordMap = wordMap.filter(new wordFilter());
> JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
> ReduceWords());
> wordCount.print();
> jssc.start();
> jssc.awaitTermination();
> return 0;
>
> If I remove the code (highlighted) "JavaPairDStream<String, Integer>
> wordCount = wordMap.reduceByKey(new ReduceWords());", the application works
> just fine...
> The moment I introduce the "reduceBykey", I start getting the following
> error and spark streaming shuts down -
>
> 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
> 1413323925000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not
> serializable: java.io.NotSerializableException: KafkaStreamingWordCount
>         at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         .....
> .....
>
> 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage
> 2
> java.util.NoSuchElementException: key not found: Stage 2
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:58)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
> .....
> .....
>
>
> My assumption as to why it is failing is the following -
>
> The producer application is not continuously streaming data. There are
> periods where there is no data being produced.  On the flip side, Spark
> Streaming is generating DStream every one second. This DStreams is comprised
> of RDDs with no data associated with them. Hence, I am wondering if this
> would cause the "reduceByKey" transformation to throw an exception...
>
> Here are some general questions -
> (1) What happens when there is no data in the stream.... In terms of DStream
> and underlying RDD?
> (2) Since DStreams are just a wrapper around all individual RDD for a
> particular slice of time, I am assuming that these RDD are associated with
> an empty dataset. Is this correct?
> (3) What is a generally acceptable solution to weed out these RDD that do
> not have any data associated with them.
>
>
> Regards,
> - Jacob

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to