Hi All,

I figured out what the problem was. Thank you Sean for pointing me in the
right direction. All the jibber jabber about empty DStream / RDD was all
just pure nonsense [?] . I guess the sequence of events (the fact that spark
streaming started crashing just after I implemented the reduceBykey) and
reading the log file lead me to believe that there was something wrong with
the way I implemented the "reduceByKey". In fact there was nothing wrong
with the reduceByKey implementation. Just for closure (no pun intended), i
will try and explain what happened. Maybe it will help someone else in the
future.


Initially, my driver code had this definition -

SparkConf sparkConf = new
SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
 Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("group.id", consumerGrp);
kafkaConf.put("auto.offset.reset", "largest");
kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
kafkaConf.put("rebalance.max.retries", "20");
kafkaConf.put("rebalance.backoff.ms", "30000");
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
 List<JavaPairDStream<byte[], String>> kafkaStreams = new
ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
wordMap.print();
 jssc.start();
jssc.awaitTermination();


The above code does not have a reduceByKey. All I was doing was printing
out was the pair [<String>, 1], and things worked perfectly fine. I started
spark streaming and then stated the kafka producer and in the logs I could
see the results. So far so good.

Then I proceeded to introduce the "reduceByKey", to count the words in each
batch. I created a ReduceWords.java file with the class ReduceWords with
the following definition.

public class ReduceWords implements Function2<Integer, Integer, Integer> {

private static final long serialVersionUID = -6076139388549335886L;

public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}

}

and in my driver code, I introduced reduceByKey as follows -

...
...
...
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
 List<JavaPairDStream<byte[], String>> kafkaStreams = new
ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
ReduceWords());
wordCount.print();
 jssc.start();
jssc.awaitTermination();

This is when I started getting the exceptions and spark started to crash.
So my instinct was to presume that something about reduceByKey was at
fault. Then Sean pointed me to the idea that, reference to the containing
class may have been serialized in the closure. But the issue was
ReduceWords is just a regular class in its own java file. It is not an
inner or anonymous class. This was what stumped me. I just could not figure
out how ReduceWord could reference in any shape or form the driver class.

The problem it turns out was the following -

...
...
...
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
 List<JavaPairDStream<byte[], String>> kafkaStreams = new
ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
PairFunction<Tuple2<byte[],String>, byte[], String>() {
private static final long serialVersionUID = -1936810126415608167L;
public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
Exception {
return tuple2;
}
}
)
);
}

JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
 JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new MapWords());
wordMap = wordMap.filter(new wordFilter());
JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
ReduceWords());
wordCount.print();
 jssc.start();
jssc.awaitTermination();

The highlighted code above is an anonymous class (which is the same code
and worked perfectly fine without the reduceByKey) and this was what was
causing the problem. I then created a separate java file with a class in it
-

public class MapInputs implements PairFunction<Tuple2<byte[],String>,
byte[], String> {
private static final long serialVersionUID = -5646014065605843441L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> input) throws
Exception {
return input;
}
}

and then removed the anonymous class -

                for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
MapInputs());
}


When, I ran this, it worked perfectly fine. Removing of the original
anonymous function .mapToPair(new PairFunction<Tuple2<byte[],String>,
byte[], String>() { ... }) did the trick.

What I still don't understand is that, why with the anonymous function
without the reduceByKey, worked fine, whereas the anonymous class with the
reduceByKey caused spark to serialize the driver? The answer to this
question I still don't know. Maybe someone from the community can answer.
Not knowing the internals of spark, I can only speculate that introducing
'reduceByKey' caused the spark optimizer to create a different DAG that
caused the serialization of the driver.

Anyways, lesson learned "be EXTRA CAREFUL with closures especially inner
class and anonymous classes".



Regards,
-Jacob





On Tue, Oct 14, 2014 at 11:35 PM, Sean Owen <so...@cloudera.com> wrote:

> The problem is not ReduceWords, since it is already Serializable by
> implementing Function2. Indeed the error tells you just what is
> unserializable: KafkaStreamingWordCount, your driver class.
>
> Something is causing a reference to the containing class to be
> serialized in the closure. The best fix is to not do this. Usually the
> culprit is an inner class, possibly anonymous, that is non-static.
> These contain a hidden reference to the containing class, through
> which you may be referring to one of its members. If not, it's still
> possible the closure cleaner isn't removing the reference even though
> it could.
>
> Is ReduceWords actually an inner class?
>
> Or on another tangent, when you remove reduceByKey, you are also
> removing print? that would cause it to do nothing, which of course
> generates no error.
>
> On Wed, Oct 15, 2014 at 12:11 AM, Abraham Jacob <abe.jac...@gmail.com>
> wrote:
> >
> > Hi All,
> >
> > I am trying to understand what is going on in my simple WordCount Spark
> > Streaming application. Here is the setup -
> >
> > I have a Kafka producer that is streaming words (lines of text). On the
> flip
> > side, I have a spark streaming application that uses the high-level
> > Kafka/Spark connector to read in these messages from the kafka topic. The
> > code is straightforward  -
> > Using CDH5.1.3 distribution and submitting the job to a yarn cluster
> >
> >
> > SparkConf sparkConf = new
> > SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
> > sparkConf.set("spark.shuffle.manager", "SORT");
> > sparkConf.set("spark.streaming.unpersist", "true");
> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> > Duration(1000));
> > Map<String, String> kafkaConf = new HashMap<String, String>();
> > kafkaConf.put("zookeeper.connect", zookeeper);
> > kafkaConf.put("group.id", consumerGrp);
> > kafkaConf.put("auto.offset.reset", "largest");
> > kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
> > kafkaConf.put("rebalance.max.retries", "20");
> > kafkaConf.put("rebalance.backoff.ms", "30000");
> > Map<String, Integer> topicMap = new HashMap<String, Integer>();
> > topicMap.put(topic, 1);
> > List<JavaPairDStream<byte[], String>> kafkaStreams = new
> > ArrayList<JavaPairDStream<byte[], String>>();
> > for(int i = 0; i < numPartitions; i++) {
> > kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> String.class,
> > DefaultDecoder.class, PayloadDeSerializer.class,
> > kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
> > PairFunction<Tuple2<byte[],String>, byte[], String>() {
> >
> > private static final long serialVersionUID = -1936810126415608167L;
> >
> > public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
> > Exception {
> > return tuple2;
> > }
> > }
> > )
> > );
> > }
> >
> > JavaPairDStream<byte[], String> unifiedStream;
> > if (kafkaStreams.size() > 1) {
> > unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> > kafkaStreams.size()));
> > } else {
> > unifiedStream = kafkaStreams.get(0);
> > }
> > JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
> > JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new
> MapWords());
> > wordMap = wordMap.filter(new wordFilter());
> > JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
> > ReduceWords());
> > wordCount.print();
> > jssc.start();
> > jssc.awaitTermination();
> > return 0;
> >
> > If I remove the code (highlighted) "JavaPairDStream<String, Integer>
> > wordCount = wordMap.reduceByKey(new ReduceWords());", the application
> works
> > just fine...
> > The moment I introduce the "reduceBykey", I start getting the following
> > error and spark streaming shuts down -
> >
> > 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
> > 1413323925000 ms.0
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not
> > serializable: java.io.NotSerializableException: KafkaStreamingWordCount
> >         at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >         at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> >         at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> >         at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >         at
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >         .....
> > .....
> >
> > 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found:
> Stage
> > 2
> > java.util.NoSuchElementException: key not found: Stage 2
> >         at scala.collection.MapLike$class.default(MapLike.scala:228)
> >         at scala.collection.AbstractMap.default(Map.scala:58)
> >         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> >         at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
> >         at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
> >         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> >         at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
> > .....
> > .....
> >
> >
> > My assumption as to why it is failing is the following -
> >
> > The producer application is not continuously streaming data. There are
> > periods where there is no data being produced.  On the flip side, Spark
> > Streaming is generating DStream every one second. This DStreams is
> comprised
> > of RDDs with no data associated with them. Hence, I am wondering if this
> > would cause the "reduceByKey" transformation to throw an exception...
> >
> > Here are some general questions -
> > (1) What happens when there is no data in the stream.... In terms of
> DStream
> > and underlying RDD?
> > (2) Since DStreams are just a wrapper around all individual RDD for a
> > particular slice of time, I am assuming that these RDD are associated
> with
> > an empty dataset. Is this correct?
> > (3) What is a generally acceptable solution to weed out these RDD that do
> > not have any data associated with them.
> >
> >
> > Regards,
> > - Jacob
>



-- 
~

Reply via email to