Yeah... it totally should be... There is nothing fancy in there -
import org.apache.spark.api.java.function.Function2; public class ReduceWords implements Function2<Integer, Integer, Integer> { private static final long serialVersionUID = -6076139388549335886L; public Integer call(Integer first, Integer second){ return first + second; } } On Tue, Oct 14, 2014 at 4:16 PM, Stephen Boesch <java...@gmail.com> wrote: > Is ReduceWords serializable? > > 2014-10-14 16:11 GMT-07:00 Abraham Jacob <abe.jac...@gmail.com>: > > >> Hi All, >> >> I am trying to understand what is going on in my simple WordCount Spark >> Streaming application. Here is the setup - >> >> I have a Kafka producer that is streaming words (lines of text). On the >> flip side, I have a spark streaming application that uses the high-level >> Kafka/Spark connector to read in these messages from the kafka topic. The >> code is straightforward - >> Using CDH5.1.3 distribution and submitting the job to a yarn cluster >> >> >> SparkConf sparkConf = new >> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); >> sparkConf.set("spark.shuffle.manager", "SORT"); >> sparkConf.set("spark.streaming.unpersist", "true"); >> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new >> Duration(1000)); >> Map<String, String> kafkaConf = new HashMap<String, String>(); >> kafkaConf.put("zookeeper.connect", zookeeper); >> kafkaConf.put("group.id", consumerGrp); >> kafkaConf.put("auto.offset.reset", "largest"); >> kafkaConf.put("zookeeper.conection.timeout.ms", "1000"); >> kafkaConf.put("rebalance.max.retries", "20"); >> kafkaConf.put("rebalance.backoff.ms", "30000"); >> Map<String, Integer> topicMap = new HashMap<String, Integer>(); >> topicMap.put(topic, 1); >> List<JavaPairDStream<byte[], String>> kafkaStreams = new >> ArrayList<JavaPairDStream<byte[], String>>(); >> for(int i = 0; i < numPartitions; i++) { >> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, >> String.class, >> DefaultDecoder.class, PayloadDeSerializer.class, >> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new >> PairFunction<Tuple2<byte[],String>, byte[], String>() { >> >> private static final long serialVersionUID = -1936810126415608167L; >> >> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws >> Exception { >> return tuple2; >> } >> } >> ) >> ); >> } >> >> JavaPairDStream<byte[], String> unifiedStream; >> if (kafkaStreams.size() > 1) { >> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, >> kafkaStreams.size())); >> } else { >> unifiedStream = kafkaStreams.get(0); >> } >> JavaDStream<String> lines = unifiedStream.flatMap(new SplitLines()); >> JavaPairDStream<String, Integer> wordMap = lines.mapToPair(new >> MapWords()); >> wordMap = wordMap.filter(new wordFilter()); >> JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new >> ReduceWords()); >> wordCount.print(); >> jssc.start(); >> jssc.awaitTermination(); >> return 0; >> >> If I remove the code (highlighted) "JavaPairDStream<String, Integer> >> wordCount = wordMap.reduceByKey(new ReduceWords());", the application works >> just fine... >> The moment I introduce the "reduceBykey", I start getting the following >> error and spark streaming shuts down - >> >> 14/10/14 17:58:45 ERROR JobScheduler: Error running job streaming job >> 1413323925000 ms.0 >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> not serializable: java.io.NotSerializableException: KafkaStreamingWordCount >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> ..... >> ..... >> >> 14/10/14 17:58:45 ERROR DAGSchedulerEventProcessActor: key not found: >> Stage 2 >> java.util.NoSuchElementException: key not found: Stage 2 >> at scala.collection.MapLike$class.default(MapLike.scala:228) >> at scala.collection.AbstractMap.default(Map.scala:58) >> at scala.collection.mutable.HashMap.apply(HashMap.scala:64) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:646) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1$$anonfun$apply$16.apply(DAGScheduler.scala:645) >> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:645) >> ..... >> ..... >> >> >> *My assumption as to why it is failing is the following - * >> >> The producer application is not continuously streaming data. There are >> periods where there is no data being produced. On the flip side, Spark >> Streaming is generating DStream every one second. This DStreams is >> comprised of RDDs with no data associated with them. Hence, I am wondering >> if this would cause the "reduceByKey" transformation to throw an >> exception... >> >> Here are some general questions - >> (1) What happens when there is no data in the stream.... In terms of >> DStream and underlying RDD? >> (2) Since DStreams are just a wrapper around all individual RDD for a >> particular slice of time, I am assuming that these RDD are associated with >> an empty dataset. Is this correct? >> (3) What is a generally acceptable solution to weed out these RDD that do >> not have any data associated with them. >> >> >> Regards, >> - Jacob >> > > -- ~