Hi all, I recently just picked up Spark and am trying to work through a coding issue that involves the reduceByKey method. After various debugging efforts, it seems that the reducyByKey method never gets called.
Here's my workflow, which is followed by my code and results: My parsed data contains three fields (val1 val2 val3), separated by spaces. Where I place the data into an JavaDStream using .flatmap. >From there I am inspecting the 3rd value (val3) and placing the entire string into this RDDStream "bucket", since I need the correlated values for the record. (Later, I'll be expanding judiciously to filter all of the values I need into multiple buckets/filters). If I get the match I'm looking for, I add an Integer onto the values String, so I end up (val1 val2 val3, 1) to be used by my reduceByKey method. >From there, I am calling reduceByKey method, but it doesn't seem to ever get called. (although I am calling the print action on the RDD). However, I can see the spark print statements showing data is being consumed through my Kafka implementation, but I never get a value to print..... Any ideas on what is going on here? =========================== Here's the important snippets of the code I've implemented... //Create a Kafka Dstream broker JavaPairDStream<String, String> messages = KafkaUtils.createStream(jsc, zkQuorum, group, topicMap); //get the data stream into an RDD JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); //Parse out the data and place into new RDD JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { /* * let's create singleton object for our Kafka Parser class */ KafkaParser kpObj = new KafkaParser(); List<String> lastList = new ArrayList<String>(); lastList = kpObj.getProcessingValues( kpObj.myKeyToValue( kpObj.myFillParser(x))); return kpObj.getProcessingValues( kpObj.myKeyToValue( kpObj.myFillParser(x))); } }); // Filter each response code into it's own rdd JavaDStream<String> responseCode3xxMap = words.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { Pattern regex = Pattern.compile("\\s[3][0-9][0-9]"); Matcher regexMatcher; List<String> properRec = new ArrayList<String>(); regexMatcher = regex.matcher(s); //pull out all 3xx response codes while (regexMatcher.find()) { if (regexMatcher.group() != null) { properRec.add(s); } } return properRec; } }); JavaPairDStream<String, Integer> responseCode3xxPairs = responseCode3xxMap.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { Tuple2<String, Integer> myTuple = new Tuple2<String, Integer>(s, 1); return myTuple; } }); //THIS NEVER SEEMS TO GET CALLED?? JavaPairDStream<String, Integer> responseCode3xxCounts = responseCode3xxPairs.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) throws Exception { Integer myCount = a + b; return myCount; } }); responseCode3xxMap.print(); responseCode3xxPairs.print(); responseCode3xxCounts.print(); jsc.start(); jsc.awaitTermination(); =================== And the basic results... ------------------------------------------- Time: 1404341640000 ms ------------------------------------------- 12345 1.111.111.111 304 ... ------------------------------------------- Time: 1404341640000 ms ------------------------------------------- (12345 1.111.111.111 304,1) ... While this executes, and add in new data, I'm able to witness new data being received, but it never seems to get processed / printed... 14/07/02 16:54:00 INFO scheduler.ReceiverTracker: Stream 0 received 8 blocks 14/07/02 16:54:00 INFO scheduler.JobScheduler: Added jobs for time 1404341640000 ms 14/07/02 16:54:10 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks 14/07/02 16:54:10 INFO scheduler.JobScheduler: Added jobs for time 1404341650000 ms -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-Not-Being-Called-by-Spark-Streaming-tp8684.html Sent from the Apache Spark User List mailing list archive at Nabble.com.