Re: Spark Streaming Empty DStream / RDD and reduceByKey
The problem is not ReduceWords, since it is already Serializable by implementing Function2. Indeed the error tells you just what is unserializable: KafkaStreamingWordCount, your driver class. Something is causing a reference to the containing class to be serialized in the closure. The best fix is to not do this. Usually the culprit is an inner class, possibly anonymous, that is non-static. These contain a hidden reference to the containing class, through which you may be referring to one of its members. If not, it's still possible the closure cleaner isn't removing the reference even though it could. Is ReduceWords actually an inner class? Or on another tangent, when you remove reduceByKey, you are also removing print? that would cause it to do nothing, which of course generates no error. On Wed, Oct 15, 2014 at 12:11 AM, Abraham Jacob abe.jac...@gmail.com wrote: Hi All, I am trying to understand what is going on in my simple WordCount Spark Streaming application. Here is the setup - I have a Kafka producer that is streaming words (lines of text). On the flip side, I have a spark streaming application that uses the high-level Kafka/Spark connector to read in these messages from the kafka topic. The code is straightforward - Using CDH5.1.3 distribution and submitting the job to a yarn cluster SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); return 0; If I remove the code (highlighted) JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords());, the application works just fine... The moment I introduce the reduceBykey, I start getting the following error and spark streaming shuts down - 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job 1413323925000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) . . 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at
Re: Spark Streaming Empty DStream / RDD and reduceByKey
Hi All, I figured out what the problem was. Thank you Sean for pointing me in the right direction. All the jibber jabber about empty DStream / RDD was all just pure nonsense [?] . I guess the sequence of events (the fact that spark streaming started crashing just after I implemented the reduceBykey) and reading the log file lead me to believe that there was something wrong with the way I implemented the reduceByKey. In fact there was nothing wrong with the reduceByKey implementation. Just for closure (no pun intended), i will try and explain what happened. Maybe it will help someone else in the future. Initially, my driver code had this definition - SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); wordMap.print(); jssc.start(); jssc.awaitTermination(); The above code does not have a reduceByKey. All I was doing was printing out was the pair [String, 1], and things worked perfectly fine. I started spark streaming and then stated the kafka producer and in the logs I could see the results. So far so good. Then I proceeded to introduce the reduceByKey, to count the words in each batch. I created a ReduceWords.java file with the class ReduceWords with the following definition. public class ReduceWords implements Function2Integer, Integer, Integer { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } } and in my driver code, I introduced reduceByKey as follows - ... ... ... MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); This is when I started getting the exceptions and spark started to crash. So my instinct was to presume that something about reduceByKey was at fault. Then Sean pointed me to the idea that, reference to the containing class may have been serialized in the closure. But the issue was ReduceWords is just a regular class in its own java file. It is not an inner or anonymous class. This was what stumped me. I just could not figure out how ReduceWord could reference in any shape or form the driver class. The problem it turns out was the following - ... ... ... MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new
Spark Streaming Empty DStream / RDD and reduceByKey
Hi All, I am trying to understand what is going on in my simple WordCount Spark Streaming application. Here is the setup - I have a Kafka producer that is streaming words (lines of text). On the flip side, I have a spark streaming application that uses the high-level Kafka/Spark connector to read in these messages from the kafka topic. The code is straightforward - Using CDH5.1.3 distribution and submitting the job to a yarn cluster SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); return 0; If I remove the code (highlighted) JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords());, the application works just fine... The moment I introduce the reduceBykey, I start getting the following error and spark streaming shuts down - 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job 1413323925000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) . . 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645) . . *My assumption as to why it is failing is the following - * The producer application is not continuously streaming data. There are periods where there is no data being produced. On the flip side, Spark Streaming is generating DStream every one second. This DStreams is comprised of RDDs with no data associated with them. Hence, I am wondering if this would cause the reduceByKey transformation to throw an exception... Here are some general questions - (1) What happens when there is no data in the stream In terms of DStream and underlying RDD? (2) Since DStreams are just a wrapper around all individual RDD for a particular slice of time, I am assuming that these RDD are associated with an empty dataset. Is this correct? (3) What is a generally acceptable solution to weed out these RDD that do not have any data associated with them. Regards, - Jacob
Re: Spark Streaming Empty DStream / RDD and reduceByKey
Yeah... it totally should be... There is nothing fancy in there - import org.apache.spark.api.java.function.Function2; public class ReduceWords implements Function2Integer, Integer, Integer { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer first, Integer second){ return first + second; } } On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch java...@gmail.com wrote: Is ReduceWords serializable? 2014-10-14 16:11 GMT-07:00 Abraham Jacob abe.jac...@gmail.com: Hi All, I am trying to understand what is going on in my simple WordCount Spark Streaming application. Here is the setup - I have a Kafka producer that is streaming words (lines of text). On the flip side, I have a spark streaming application that uses the high-level Kafka/Spark connector to read in these messages from the kafka topic. The code is straightforward - Using CDH5.1.3 distribution and submitting the job to a yarn cluster SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap = wordMap.filter(new wordFilter()); JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords()); wordCount.print(); jssc.start(); jssc.awaitTermination(); return 0; If I remove the code (highlighted) JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new ReduceWords());, the application works just fine... The moment I introduce the reduceBykey, I start getting the following error and spark streaming shuts down - 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job 1413323925000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) . . 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645) . . *My assumption as to why it is failing is the following - * The producer application is not continuously streaming data. There are periods where there is no data being produced. On the flip side, Spark Streaming is generating DStream every one second. This DStreams is comprised of RDDs
Re: Spark Streaming Empty DStream / RDD and reduceByKey
The results are no different - import org.apache.spark.api.java.function.Function2; import java.io.Serializable; public class ReduceWords implements Serializable, Function2Integer, Integer, Integer { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer first, Integer second){ return first + second; } } Same exception -- 14/10/14 20:04:47 ERROR JobScheduler: Error running job streaming job 1413331487000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: KafkaStreamingWordCount at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) . 14/10/14 20:04:47 ERROR DAGSchedulerEventProcessActor: key not found: Stage 2 java.util.NoSuchElementException: key not found: Stage 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) .. .. On Tue, Oct 14, 2014 at 4:56 PM, Michael Campbell michael.campb...@gmail.com wrote: Do you get any different results if you have ReduceWords actually implement java.io.Serializable? On Tue, Oct 14, 2014 at 7:35 PM, Abraham Jacob abe.jac...@gmail.com wrote: Yeah... it totally should be... There is nothing fancy in there - import org.apache.spark.api.java.function.Function2; public class ReduceWords implements Function2Integer, Integer, Integer { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer first, Integer second){ return first + second; } } On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch java...@gmail.com wrote: Is ReduceWords serializable? 2014-10-14 16:11 GMT-07:00 Abraham Jacob abe.jac...@gmail.com: Hi All, I am trying to understand what is going on in my simple WordCount Spark Streaming application. Here is the setup - I have a Kafka producer that is streaming words (lines of text). On the flip side, I have a spark streaming application that uses the high-level Kafka/Spark connector to read in these messages from the kafka topic. The code is straightforward - Using CDH5.1.3 distribution and submitting the job to a yarn cluster SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 20); kafkaConf.put(rebalance.backoff.ms, 3); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } JavaDStreamString lines = unifiedStream.flatMap(new SplitLines()); JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords()); wordMap =