Yeah... it totally should be... There is nothing fancy in there -

import org.apache.spark.api.java.function.Function2;

public class ReduceWords implements Function2<Integer, Integer, Integer> {

private static final long serialVersionUID = -6076139388549335886L;

public Integer call(Integer first, Integer second){
return first + second;
}
}




On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch <java...@gmail.com> wrote:

> Is ReduceWords serializable?
>
> 2014-10-14 16:11 GMT-07:00 Abraham Jacob <abe.jac...@gmail.com>:
>
>
>> Hi All,
>>
>> I am trying to understand what is going on in my simple WordCount Spark
>> Streaming application. Here is the setup -
>>
>> I have a Kafka producer that is streaming words (lines of text). On the
>> flip side, I have a spark streaming application that uses the high-level
>> Kafka/Spark connector to read in these messages from the kafka topic. The
>> code is straightforward  -
>> Using CDH5.1.3 distribution and submitting the job to a yarn cluster
>>
>>
>> SparkConf sparkConf = new
>> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
>> sparkConf.set("spark.shuffle.manager", "SORT");
>> sparkConf.set("spark.streaming.unpersist", "true");
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
>> Duration(1000));
>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>> kafkaConf.put("zookeeper.connect", zookeeper);
>> kafkaConf.put("group.id", consumerGrp);
>> kafkaConf.put("auto.offset.reset", "largest");
>> kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
>> kafkaConf.put("rebalance.max.retries", "20");
>> kafkaConf.put("rebalance.backoff.ms", "30000");
>>  Map<String, Integer> topicMap = new HashMap<String, Integer>();
>> topicMap.put(topic, 1);
>>  List<JavaPairDStream<byte[], String>> kafkaStreams = new
>> ArrayList<JavaPairDStream<byte[], String>>();
>> for(int i = 0; i < numPartitions; i++) {
>> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>> String.class,
>> DefaultDecoder.class, PayloadDeSerializer.class,
>> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
>> PairFunction<Tuple2<byte[],String>, byte[], String>() {
>>
>> private static final long serialVersionUID = -1936810126415608167L;
>>
>> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
>> Exception {
>> return tuple2;
>> }
>> }
>> )
>> );
>> }
>>
>> JavaPairDStream<byte[], String> unifiedStream;
>> if (kafkaStreams.size() > 1) {
>> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
>> kafkaStreams.size()));
>> } else {
>> unifiedStream = kafkaStreams.get(0);
>> }
>>  JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
>> JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new
>> MapWords());
>> wordMap = wordMap.filter(new wordFilter());
>> JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
>> ReduceWords());
>>  wordCount.print();
>> jssc.start();
>> jssc.awaitTermination();
>>  return 0;
>>
>> If I remove the code (highlighted) "JavaPairDStream<String, Integer>
>> wordCount = wordMap.reduceByKey(new ReduceWords());", the application works
>> just fine...
>> The moment I introduce the "reduceBykey", I start getting the following
>> error and spark streaming shuts down -
>>
>> 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
>> 1413323925000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not serializable: java.io.NotSerializableException: KafkaStreamingWordCount
>>         at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         .....
>> .....
>>
>> 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found:
>> Stage 2
>> java.util.NoSuchElementException: key not found: Stage 2
>>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>>         at scala.collection.AbstractMap.default(Map.scala:58)
>>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
>>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
>> .....
>> .....
>>
>>
>> *My assumption as to why it is failing is the following - *
>>
>> The producer application is not continuously streaming data. There are
>> periods where there is no data being produced.  On the flip side, Spark
>> Streaming is generating DStream every one second. This DStreams is
>> comprised of RDDs with no data associated with them. Hence, I am wondering
>> if this would cause the "reduceByKey" transformation to throw an
>> exception...
>>
>> Here are some general questions -
>> (1) What happens when there is no data in the stream.... In terms of
>> DStream and underlying RDD?
>> (2) Since DStreams are just a wrapper around all individual RDD for a
>> particular slice of time, I am assuming that these RDD are associated with
>> an empty dataset. Is this correct?
>> (3) What is a generally acceptable solution to weed out these RDD that do
>> not have any data associated with them.
>>
>>
>> Regards,
>> - Jacob
>>
>
>


-- 
~

Reply via email to