The results are no different -

import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;

public class ReduceWords implements Serializable, Function2<Integer,
Integer, Integer> {

private static final long serialVersionUID = -6076139388549335886L;

public Integer call(Integer first, Integer second){
return first + second;
}
}


Same exception --

14/10/14 20:04:47 ERROR JobScheduler: Error running job streaming job
1413331487000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException: KafkaStreamingWordCount
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
        ........
        .........

14/10/14 20:04:47 ERROR DAGSchedulerEventProcessActor: key not found: Stage
2
java.util.NoSuchElementException: key not found: Stage 2
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
......
......










On Tue, Oct 14, 2014 at 4:56 PM, Michael Campbell <
michael.campb...@gmail.com> wrote:

> Do you get any different results if you have ReduceWords actually
> implement java.io.Serializable?
>
> On Tue, Oct 14, 2014 at 7:35 PM, Abraham Jacob <abe.jac...@gmail.com>
> wrote:
>
>> Yeah... it totally should be... There is nothing fancy in there -
>>
>>
>> import org.apache.spark.api.java.function.Function2;
>>
>> public class ReduceWords implements Function2<Integer, Integer, Integer> {
>>
>> private static final long serialVersionUID = -6076139388549335886L;
>>
>> public Integer call(Integer first, Integer second){
>> return first + second;
>> }
>> }
>>
>>
>>
>>
>> On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch <java...@gmail.com>
>> wrote:
>>
>>> Is ReduceWords serializable?
>>>
>>> 2014-10-14 16:11 GMT-07:00 Abraham Jacob <abe.jac...@gmail.com>:
>>>
>>>
>>>> Hi All,
>>>>
>>>> I am trying to understand what is going on in my simple WordCount Spark
>>>> Streaming application. Here is the setup -
>>>>
>>>> I have a Kafka producer that is streaming words (lines of text). On the
>>>> flip side, I have a spark streaming application that uses the high-level
>>>> Kafka/Spark connector to read in these messages from the kafka topic. The
>>>> code is straightforward  -
>>>> Using CDH5.1.3 distribution and submitting the job to a yarn cluster
>>>>
>>>>
>>>> SparkConf sparkConf = new
>>>> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
>>>> sparkConf.set("spark.shuffle.manager", "SORT");
>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
>>>> Duration(1000));
>>>>  Map<String, String> kafkaConf = new HashMap<String, String>();
>>>> kafkaConf.put("zookeeper.connect", zookeeper);
>>>> kafkaConf.put("group.id", consumerGrp);
>>>> kafkaConf.put("auto.offset.reset", "largest");
>>>> kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
>>>> kafkaConf.put("rebalance.max.retries", "20");
>>>> kafkaConf.put("rebalance.backoff.ms", "30000");
>>>>  Map<String, Integer> topicMap = new HashMap<String, Integer>();
>>>> topicMap.put(topic, 1);
>>>>  List<JavaPairDStream<byte[], String>> kafkaStreams = new
>>>> ArrayList<JavaPairDStream<byte[], String>>();
>>>> for(int i = 0; i < numPartitions; i++) {
>>>> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>>>> String.class,
>>>> DefaultDecoder.class, PayloadDeSerializer.class,
>>>> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
>>>> PairFunction<Tuple2<byte[],String>, byte[], String>() {
>>>>
>>>> private static final long serialVersionUID = -1936810126415608167L;
>>>>
>>>> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2)
>>>> throws Exception {
>>>> return tuple2;
>>>> }
>>>> }
>>>> )
>>>> );
>>>> }
>>>>
>>>> JavaPairDStream<byte[], String> unifiedStream;
>>>> if (kafkaStreams.size() > 1) {
>>>> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
>>>> kafkaStreams.size()));
>>>> } else {
>>>> unifiedStream = kafkaStreams.get(0);
>>>> }
>>>>  JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines());
>>>> JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new
>>>> MapWords());
>>>> wordMap = wordMap.filter(new wordFilter());
>>>> JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
>>>> ReduceWords());
>>>>  wordCount.print();
>>>> jssc.start();
>>>> jssc.awaitTermination();
>>>>  return 0;
>>>>
>>>> If I remove the code (highlighted) "JavaPairDStream<String, Integer>
>>>> wordCount = wordMap.reduceByKey(new ReduceWords());", the application works
>>>> just fine...
>>>> The moment I introduce the "reduceBykey", I start getting the following
>>>> error and spark streaming shuts down -
>>>>
>>>> 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job
>>>> 1413323925000 ms.0
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> not serializable: java.io.NotSerializableException: KafkaStreamingWordCount
>>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>>>         at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>         .....
>>>> .....
>>>>
>>>> 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found:
>>>> Stage 2
>>>> java.util.NoSuchElementException: key not found: Stage 2
>>>>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>         at scala.collection.AbstractMap.default(Map.scala:58)
>>>>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645)
>>>>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>>>>         at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645)
>>>> .....
>>>> .....
>>>>
>>>>
>>>> *My assumption as to why it is failing is the following - *
>>>>
>>>> The producer application is not continuously streaming data. There are
>>>> periods where there is no data being produced.  On the flip side, Spark
>>>> Streaming is generating DStream every one second. This DStreams is
>>>> comprised of RDDs with no data associated with them. Hence, I am wondering
>>>> if this would cause the "reduceByKey" transformation to throw an
>>>> exception...
>>>>
>>>> Here are some general questions -
>>>> (1) What happens when there is no data in the stream.... In terms of
>>>> DStream and underlying RDD?
>>>> (2) Since DStreams are just a wrapper around all individual RDD for a
>>>> particular slice of time, I am assuming that these RDD are associated with
>>>> an empty dataset. Is this correct?
>>>> (3) What is a generally acceptable solution to weed out these RDD that
>>>> do not have any data associated with them.
>>>>
>>>>
>>>> Regards,
>>>> - Jacob
>>>>
>>>
>>>
>>
>>
>> --
>> ~
>>
>
>


-- 
~

Reply via email to