Hi All, I figured out what the problem was. Thank you Sean for pointing me in the right direction. All the jibber jabber about empty DStream / RDD was all just pure nonsense [?] . I guess the sequence of events (the fact that spark streaming started crashing just after I implemented the reduceBykey) and reading the log file lead me to believe that there was something wrong with the way I implemented the "reduceByKey". In fact there was nothing wrong with the reduceByKey implementation. Just for closure (no pun intended), i will try and explain what happened. Maybe it will help someone else in the future.
Initially, my driver code had this definition - SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); sparkConf.set("spark.shuffle.manager", "SORT"); sparkConf.set("spark.streaming.unpersist", "true"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); Map<String, String> kafkaConf = new HashMap<String, String>(); kafkaConf.put("zookeeper.connect", zookeeper); kafkaConf.put("group.id", consumerGrp); kafkaConf.put("auto.offset.reset", "largest"); kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); kafkaConf.put("rebalance.max.retries", "20"); kafkaConf.put("rebalance.backoff.ms", "30000"); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>(); for(int i = 0; i < numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStream<byte[], String> unifiedStream; if (kafkaStreams.size() > 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); wordMap.print(); jssc.start(); jssc.awaitTermination(); The above code does not have a reduceByKey. All I was doing was printing out was the pair [<String>, 1], and things worked perfectly fine. I started spark streaming and then stated the kafka producer and in the logs I could see the results. So far so good. Then I proceeded to introduce the "reduceByKey", to count the words in each batch. I created a ReduceWords.java file with the class ReduceWords with the following definition. public class ReduceWords implements Function2<Integer, Integer, Integer> { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } } and in my driver code, I introduced reduceByKey as follows - ... ... ... Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>(); for(int i = 0; i < numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStream<byte[], String> unifiedStream; if (kafkaStreams.size() > 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); This is when I started getting the exceptions and spark started to crash. So my instinct was to presume that something about reduceByKey was at fault. Then Sean pointed me to the idea that, reference to the containing class may have been serialized in the closure. But the issue was ReduceWords is just a regular class in its own java file. It is not an inner or anonymous class. This was what stumped me. I just could not figure out how ReduceWord could reference in any shape or form the driver class. The problem it turns out was the following - ... ... ... Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>(); for(int i = 0; i < numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStream<byte[], String> unifiedStream; if (kafkaStreams.size() > 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); The highlighted code above is an anonymous class (which is the same code and worked perfectly fine without the reduceByKey) and this was what was causing the problem. I then created a separate java file with a class in it - public class MapInputs implements PairFunction<Tuple2<byte[],String>, byte[], String> { private static final long serialVersionUID = -5646014065605843441L; public Tuple2<byte[], String> call(Tuple2<byte[], String> input) throws Exception { return input; } } and then removed the anonymous class - for(int i = 0; i < numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new MapInputs()); } When, I ran this, it worked perfectly fine. Removing of the original anonymous function .mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() { ... }) did the trick. What I still don't understand is that, why with the anonymous function without the reduceByKey, worked fine, whereas the anonymous class with the reduceByKey caused spark to serialize the driver? The answer to this question I still don't know. Maybe someone from the community can answer. Not knowing the internals of spark, I can only speculate that introducing 'reduceByKey' caused the spark optimizer to create a different DAG that caused the serialization of the driver. Anyways, lesson learned "be EXTRA CAREFUL with closures especially inner class and anonymous classes". Regards, -Jacob On Tue, Oct 14, 2014 at 11:35 PM, Sean Owen <so...@cloudera.com> wrote: > The problem is not ReduceWords, since it is already Serializable by > implementing Function2. Indeed the error tells you just what is > unserializable: KafkaStreamingWordCount, your driver class. > > Something is causing a reference to the containing class to be > serialized in the closure. The best fix is to not do this. Usually the > culprit is an inner class, possibly anonymous, that is non-static. > These contain a hidden reference to the containing class, through > which you may be referring to one of its members. If not, it's still > possible the closure cleaner isn't removing the reference even though > it could. > > Is ReduceWords actually an inner class? > > Or on another tangent, when you remove reduceByKey, you are also > removing print? that would cause it to do nothing, which of course > generates no error. > > On Wed, Oct 15, 2014 at 12:11 AM, Abraham Jacob <abe.jac...@gmail.com> > wrote: > > > > Hi All, > > > > I am trying to understand what is going on in my simple WordCount Spark > > Streaming application. Here is the setup - > > > > I have a Kafka producer that is streaming words (lines of text). On the > flip > > side, I have a spark streaming application that uses the high-level > > Kafka/Spark connector to read in these messages from the kafka topic. The > > code is straightforward - > > Using CDH5.1.3 distribution and submitting the job to a yarn cluster > > > > > > SparkConf sparkConf = new > > SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); > > sparkConf.set("spark.shuffle.manager", "SORT"); > > sparkConf.set("spark.streaming.unpersist", "true"); > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new > > Duration(1000)); > > Map<String, String> kafkaConf = new HashMap<String, String>(); > > kafkaConf.put("zookeeper.connect", zookeeper); > > kafkaConf.put("group.id", consumerGrp); > > kafkaConf.put("auto.offset.reset", "largest"); > > kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); > > kafkaConf.put("rebalance.max.retries", "20"); > > kafkaConf.put("rebalance.backoff.ms", "30000"); > > Map<String, Integer> topicMap = new HashMap<String, Integer>(); > > topicMap.put(topic, 1); > > List<JavaPairDStream<byte[], String>> kafkaStreams = new > > ArrayList<JavaPairDStream<byte[], String>>(); > > for(int i = 0; i < numPartitions; i++) { > > kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, > String.class, > > DefaultDecoder.class, PayloadDeSerializer.class, > > kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new > > PairFunction<Tuple2<byte[],String>, byte[], String>() { > > > > private static final long serialVersionUID = -1936810126415608167L; > > > > public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws > > Exception { > > return tuple2; > > } > > } > > ) > > ); > > } > > > > JavaPairDStream<byte[], String> unifiedStream; > > if (kafkaStreams.size() > 1) { > > unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, > > kafkaStreams.size())); > > } else { > > unifiedStream = kafkaStreams.get(0); > > } > > JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); > > JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new > MapWords()); > > wordMap = wordMap.filter(new wordFilter()); > > JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new > > ReduceWords()); > > wordCount.print(); > > jssc.start(); > > jssc.awaitTermination(); > > return 0; > > > > If I remove the code (highlighted) "JavaPairDStream<String, Integer> > > wordCount = wordMap.reduceByKey(new ReduceWords());", the application > works > > just fine... > > The moment I introduce the "reduceBykey", I start getting the following > > error and spark streaming shuts down - > > > > 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job > > 1413323925000 ms.0 > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > not > > serializable: java.io.NotSerializableException: KafkaStreamingWordCount > > at > > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > > at > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > ..... > > ..... > > > > 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: > Stage > > 2 > > java.util.NoSuchElementException: key not found: Stage 2 > > at scala.collection.MapLike$class.default(MapLike.scala:228) > > at scala.collection.AbstractMap.default(Map.scala:58) > > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) > > at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645) > > ..... > > ..... > > > > > > My assumption as to why it is failing is the following - > > > > The producer application is not continuously streaming data. There are > > periods where there is no data being produced. On the flip side, Spark > > Streaming is generating DStream every one second. This DStreams is > comprised > > of RDDs with no data associated with them. Hence, I am wondering if this > > would cause the "reduceByKey" transformation to throw an exception... > > > > Here are some general questions - > > (1) What happens when there is no data in the stream.... In terms of > DStream > > and underlying RDD? > > (2) Since DStreams are just a wrapper around all individual RDD for a > > particular slice of time, I am assuming that these RDD are associated > with > > an empty dataset. Is this correct? > > (3) What is a generally acceptable solution to weed out these RDD that do > > not have any data associated with them. > > > > > > Regards, > > - Jacob > -- ~