Re: Spark Streaming Empty DStream / RDD and reduceByKey

2014-10-15 Thread Sean Owen
The problem is not ReduceWords, since it is already Serializable by
implementing Function2. Indeed the error tells you just what is
unserializable: KafkaStreamingWordCount, your driver class.

Something is causing a reference to the containing class to be
serialized in the closure. The best fix is to not do this. Usually the
culprit is an inner class, possibly anonymous, that is non-static.
These contain a hidden reference to the containing class, through
which you may be referring to one of its members. If not, it's still
possible the closure cleaner isn't removing the reference even though
it could.

Is ReduceWords actually an inner class?

Or on another tangent, when you remove reduceByKey, you are also
removing print? that would cause it to do nothing, which of course
generates no error.

On Wed, Oct 15, 2014 at 12:11 AM, Abraham Jacob abe.jac...@gmail.com wrote:

 Hi All,

 I am trying to understand what is going on in my simple WordCount Spark
 Streaming application. Here is the setup -

 I have a Kafka producer that is streaming words (lines of text). On the flip
 side, I have a spark streaming application that uses the high-level
 Kafka/Spark connector to read in these messages from the kafka topic. The
 code is straightforward  -
 Using CDH5.1.3 distribution and submitting the job to a yarn cluster


 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
 sparkConf.set(spark.shuffle.manager, SORT);
 sparkConf.set(spark.streaming.unpersist, true);
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));
 MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, zookeeper);
 kafkaConf.put(group.id, consumerGrp);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
 kafkaConf.put(rebalance.max.retries, 20);
 kafkaConf.put(rebalance.backoff.ms, 3);
 MapString, Integer topicMap = new HashMapString, Integer();
 topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();
 for(int i = 0; i  numPartitions; i++) {
 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
 DefaultDecoder.class, PayloadDeSerializer.class,
 kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
 PairFunctionTuple2byte[],String, byte[], String() {

 private static final long serialVersionUID = -1936810126415608167L;

 public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
 Exception {
 return tuple2;
 }
 }
 )
 );
 }

 JavaPairDStreambyte[], String unifiedStream;
 if (kafkaStreams.size()  1) {
 unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
 kafkaStreams.size()));
 } else {
 unifiedStream = kafkaStreams.get(0);
 }
 JavaDStreamString lines = unifiedStream.flatMap(new SplitLines());
 JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords());
 wordMap = wordMap.filter(new wordFilter());
 JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new
 ReduceWords());
 wordCount.print();
 jssc.start();
 jssc.awaitTermination();
 return 0;

 If I remove the code (highlighted) JavaPairDStreamString, Integer
 wordCount = wordMap.reduceByKey(new ReduceWords());, the application works
 just fine...
 The moment I introduce the reduceBykey, I start getting the following
 error and spark streaming shuts down -

 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
 1413323925000 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task not
 serializable: java.io.NotSerializableException: KafkaStreamingWordCount
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 .
 .

 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage
 2
 java.util.NoSuchElementException: key not found: Stage 2
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 

Re: Spark Streaming Empty DStream / RDD and reduceByKey

2014-10-15 Thread Abraham Jacob
Hi All,

I figured out what the problem was. Thank you Sean for pointing me in the
right direction. All the jibber jabber about empty DStream / RDD was all
just pure nonsense [?] . I guess the sequence of events (the fact that spark
streaming started crashing just after I implemented the reduceBykey) and
reading the log file lead me to believe that there was something wrong with
the way I implemented the reduceByKey. In fact there was nothing wrong
with the reduceByKey implementation. Just for closure (no pun intended), i
will try and explain what happened. Maybe it will help someone else in the
future.


Initially, my driver code had this definition -

SparkConf sparkConf = new
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, largest);
kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
kafkaConf.put(rebalance.max.retries, 20);
kafkaConf.put(rebalance.backoff.ms, 3);
 MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStreamString lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
wordMap.print();
 jssc.start();
jssc.awaitTermination();


The above code does not have a reduceByKey. All I was doing was printing
out was the pair [String, 1], and things worked perfectly fine. I started
spark streaming and then stated the kafka producer and in the logs I could
see the results. So far so good.

Then I proceeded to introduce the reduceByKey, to count the words in each
batch. I created a ReduceWords.java file with the class ReduceWords with
the following definition.

public class ReduceWords implements Function2Integer, Integer, Integer {

private static final long serialVersionUID = -6076139388549335886L;

public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}

}

and in my driver code, I introduced reduceByKey as follows -

...
...
...
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStreamString lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new
ReduceWords());
wordCount.print();
 jssc.start();
jssc.awaitTermination();

This is when I started getting the exceptions and spark started to crash.
So my instinct was to presume that something about reduceByKey was at
fault. Then Sean pointed me to the idea that, reference to the containing
class may have been serialized in the closure. But the issue was
ReduceWords is just a regular class in its own java file. It is not an
inner or anonymous class. This was what stumped me. I just could not figure
out how ReduceWord could reference in any shape or form the driver class.

The problem it turns out was the following -

...
...
...
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new

Spark Streaming Empty DStream / RDD and reduceByKey

2014-10-14 Thread Abraham Jacob
Hi All,

I am trying to understand what is going on in my simple WordCount Spark
Streaming application. Here is the setup -

I have a Kafka producer that is streaming words (lines of text). On the
flip side, I have a spark streaming application that uses the high-level
Kafka/Spark connector to read in these messages from the kafka topic. The
code is straightforward  -
Using CDH5.1.3 distribution and submitting the job to a yarn cluster


SparkConf sparkConf = new
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, largest);
kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
kafkaConf.put(rebalance.max.retries, 20);
kafkaConf.put(rebalance.backoff.ms, 3);
 MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
 ListJavaPairDStreambyte[], String kafkaStreams = new
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStreamString lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStreamString, Integer wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new
ReduceWords());
 wordCount.print();
jssc.start();
jssc.awaitTermination();
 return 0;

If I remove the code (highlighted) JavaPairDStreamString, Integer
wordCount = wordMap.reduceByKey(new ReduceWords());, the application works
just fine...
The moment I introduce the reduceBykey, I start getting the following
error and spark streaming shuts down -

14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
1413323925000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException: KafkaStreamingWordCount
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
.
.

14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: Stage
2
java.util.NoSuchElementException: key not found: Stage 2
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
.
.


*My assumption as to why it is failing is the following - *

The producer application is not continuously streaming data. There are
periods where there is no data being produced.  On the flip side, Spark
Streaming is generating DStream every one second. This DStreams is
comprised of RDDs with no data associated with them. Hence, I am wondering
if this would cause the reduceByKey transformation to throw an
exception...

Here are some general questions -
(1) What happens when there is no data in the stream In terms of
DStream and underlying RDD?
(2) Since DStreams are just a wrapper around all individual RDD for a
particular slice of time, I am assuming that these RDD are associated with
an empty dataset. Is this correct?
(3) What is a generally acceptable solution to weed out these RDD that do
not have any data associated with them.


Regards,
- Jacob


Re: Spark Streaming Empty DStream / RDD and reduceByKey

2014-10-14 Thread Abraham Jacob
Yeah... it totally should be... There is nothing fancy in there -


import org.apache.spark.api.java.function.Function2;

public class ReduceWords implements Function2Integer, Integer, Integer {

private static final long serialVersionUID = -6076139388549335886L;

public Integer call(Integer first, Integer second){
return first + second;
}
}




On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch java...@gmail.com wrote:

 Is ReduceWords serializable?

 2014-10-14 16:11 GMT-07:00 Abraham Jacob abe.jac...@gmail.com:


 Hi All,

 I am trying to understand what is going on in my simple WordCount Spark
 Streaming application. Here is the setup -

 I have a Kafka producer that is streaming words (lines of text). On the
 flip side, I have a spark streaming application that uses the high-level
 Kafka/Spark connector to read in these messages from the kafka topic. The
 code is straightforward  -
 Using CDH5.1.3 distribution and submitting the job to a yarn cluster


 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
 sparkConf.set(spark.shuffle.manager, SORT);
 sparkConf.set(spark.streaming.unpersist, true);
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));
  MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, zookeeper);
 kafkaConf.put(group.id, consumerGrp);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
 kafkaConf.put(rebalance.max.retries, 20);
 kafkaConf.put(rebalance.backoff.ms, 3);
  MapString, Integer topicMap = new HashMapString, Integer();
 topicMap.put(topic, 1);
  ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();
 for(int i = 0; i  numPartitions; i++) {
 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
 String.class,
 DefaultDecoder.class, PayloadDeSerializer.class,
 kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
 PairFunctionTuple2byte[],String, byte[], String() {

 private static final long serialVersionUID = -1936810126415608167L;

 public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws
 Exception {
 return tuple2;
 }
 }
 )
 );
 }

 JavaPairDStreambyte[], String unifiedStream;
 if (kafkaStreams.size()  1) {
 unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
 kafkaStreams.size()));
 } else {
 unifiedStream = kafkaStreams.get(0);
 }
  JavaDStreamString lines = unifiedStream.flatMap(new SplitLines());
 JavaPairDStreamString, Integer wordMap = lines.mapToPair(new
 MapWords());
 wordMap = wordMap.filter(new wordFilter());
 JavaPairDStreamString, Integer wordCount = wordMap.reduceByKey(new
 ReduceWords());
  wordCount.print();
 jssc.start();
 jssc.awaitTermination();
  return 0;

 If I remove the code (highlighted) JavaPairDStreamString, Integer
 wordCount = wordMap.reduceByKey(new ReduceWords());, the application works
 just fine...
 The moment I introduce the reduceBykey, I start getting the following
 error and spark streaming shuts down -

 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
 1413323925000 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException: KafkaStreamingWordCount
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 .
 .

 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found:
 Stage 2
 java.util.NoSuchElementException: key not found: Stage 2
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
 .
 .


 *My assumption as to why it is failing is the following - *

 The producer application is not continuously streaming data. There are
 periods where there is no data being produced.  On the flip side, Spark
 Streaming is generating DStream every one second. This DStreams is
 comprised of RDDs 

Re: Spark Streaming Empty DStream / RDD and reduceByKey

2014-10-14 Thread Abraham Jacob
The results are no different -

import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;

public class ReduceWords implements Serializable, Function2Integer,
Integer, Integer {

private static final long serialVersionUID = -6076139388549335886L;

public Integer call(Integer first, Integer second){
return first + second;
}
}


Same exception --

14/10/14 20:04:47 ERROR JobScheduler: Error running job streaming job
1413331487000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException: KafkaStreamingWordCount
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)

.

14/10/14 20:04:47 ERROR DAGSchedulerEventProcessActor: key not found: Stage
2
java.util.NoSuchElementException: key not found: Stage 2
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
..
..










On Tue, Oct 14, 2014 at 4:56 PM, Michael Campbell 
michael.campb...@gmail.com wrote:

 Do you get any different results if you have ReduceWords actually
 implement java.io.Serializable?

 On Tue, Oct 14, 2014 at 7:35 PM, Abraham Jacob abe.jac...@gmail.com
 wrote:

 Yeah... it totally should be... There is nothing fancy in there -


 import org.apache.spark.api.java.function.Function2;

 public class ReduceWords implements Function2Integer, Integer, Integer {

 private static final long serialVersionUID = -6076139388549335886L;

 public Integer call(Integer first, Integer second){
 return first + second;
 }
 }




 On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch java...@gmail.com
 wrote:

 Is ReduceWords serializable?

 2014-10-14 16:11 GMT-07:00 Abraham Jacob abe.jac...@gmail.com:


 Hi All,

 I am trying to understand what is going on in my simple WordCount Spark
 Streaming application. Here is the setup -

 I have a Kafka producer that is streaming words (lines of text). On the
 flip side, I have a spark streaming application that uses the high-level
 Kafka/Spark connector to read in these messages from the kafka topic. The
 code is straightforward  -
 Using CDH5.1.3 distribution and submitting the job to a yarn cluster


 SparkConf sparkConf = new
 SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
 sparkConf.set(spark.shuffle.manager, SORT);
 sparkConf.set(spark.streaming.unpersist, true);
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
 Duration(1000));
  MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, zookeeper);
 kafkaConf.put(group.id, consumerGrp);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(zookeeper.conection.timeout.ms, 1000);
 kafkaConf.put(rebalance.max.retries, 20);
 kafkaConf.put(rebalance.backoff.ms, 3);
  MapString, Integer topicMap = new HashMapString, Integer();
 topicMap.put(topic, 1);
  ListJavaPairDStreambyte[], String kafkaStreams = new
 ArrayListJavaPairDStreambyte[], String();
 for(int i = 0; i  numPartitions; i++) {
 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
 String.class,
 DefaultDecoder.class, PayloadDeSerializer.class,
 kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
 PairFunctionTuple2byte[],String, byte[], String() {

 private static final long serialVersionUID = -1936810126415608167L;

 public Tuple2byte[], String call(Tuple2byte[], String tuple2)
 throws Exception {
 return tuple2;
 }
 }
 )
 );
 }

 JavaPairDStreambyte[], String unifiedStream;
 if (kafkaStreams.size()  1) {
 unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
 kafkaStreams.size()));
 } else {
 unifiedStream = kafkaStreams.get(0);
 }
  JavaDStreamString lines = unifiedStream.flatMap(new SplitLines());
 JavaPairDStreamString, Integer wordMap = lines.mapToPair(new
 MapWords());
 wordMap =