Hi all, I recently just picked up Spark and am trying to work through a
coding issue that involves the reduceByKey method.  After various debugging
efforts, it seems that the reducyByKey method never gets called.

Here's my workflow, which is followed by my code and results:

My parsed data contains three fields (val1 val2 val3), separated by spaces. 
Where I place the data into an JavaDStream using .flatmap.

>From there I am inspecting the 3rd value (val3) and placing the entire
string into this RDDStream "bucket", since I need the correlated values for
the record.  (Later, I'll be expanding judiciously to filter all of the
values I need into multiple buckets/filters).  If I get the match I'm
looking for, I add an Integer onto the values String, so I end up (val1 val2
val3, 1) to be used by my reduceByKey method.

>From there, I am calling  reduceByKey method, but it doesn't seem to ever
get called.  (although I am calling the print action on the RDD).  However,
I can see the spark print statements showing data is being consumed through
my Kafka implementation, but I never get a value to print.....

Any ideas on what is going on here?

===========================
Here's the important snippets of the code I've implemented...

//Create a Kafka Dstream broker
JavaPairDStream<String, String> messages =
                KafkaUtils.createStream(jsc, zkQuorum, group, topicMap);

//get the data stream into an RDD 
JavaDStream<String> lines = messages.map(new Function<Tuple2&lt;String,
String>, String>() {
              public String call(Tuple2<String, String> tuple2) {
                          return tuple2._2();
                      }
                    });


//Parse out the data and place into new RDD
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
                public Iterable<String> call(String x) {
                        
                /*
                 * let's create singleton object for our Kafka Parser class
                 */
                KafkaParser kpObj = new KafkaParser();
                                
                List<String> lastList = new ArrayList<String>();
                                
                lastList =  kpObj.getProcessingValues(
                                                kpObj.myKeyToValue(
                                                     kpObj.myFillParser(x)));
                                
                return kpObj.getProcessingValues(
                                kpObj.myKeyToValue(
                                     kpObj.myFillParser(x)));           
                }
});

// Filter each response code into it's own rdd
JavaDStream<String> responseCode3xxMap = words.flatMap(new
FlatMapFunction<String, String>() {
          public Iterable<String> call(String s) {
                                  
          Pattern regex = Pattern.compile("\\s[3][0-9][0-9]");
          Matcher regexMatcher;
          List<String> properRec = new ArrayList<String>();
          regexMatcher = regex.matcher(s);
                                  
          //pull out all 3xx response codes
          while (regexMatcher.find()) {
                  if (regexMatcher.group() != null) {
                                 properRec.add(s);
                                 } 
                          }
                  return properRec;
                  }
        });

JavaPairDStream<String, Integer> responseCode3xxPairs =
responseCode3xxMap.mapToPair(
        new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) {
                                                
                Tuple2<String, Integer> myTuple = new Tuple2<String, 
Integer>(s, 1);
                return myTuple; }
        });

//THIS NEVER SEEMS TO GET CALLED??
JavaPairDStream<String, Integer> responseCode3xxCounts =
responseCode3xxPairs.reduceByKey(
        new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer a, Integer b) throws Exception {
                                                        
                        Integer myCount = a + b;
                        return myCount;
                }
        });
        

responseCode3xxMap.print();
responseCode3xxPairs.print();
responseCode3xxCounts.print();
jsc.start();    
jsc.awaitTermination();

===================
And the basic results...

-------------------------------------------
Time: 1404341640000 ms
-------------------------------------------
12345 1.111.111.111 304
...

-------------------------------------------
Time: 1404341640000 ms
-------------------------------------------
(12345 1.111.111.111 304,1)
...


While this executes, and add in new data, I'm able to witness new data being
received, but it never seems to get processed / printed...

14/07/02 16:54:00 INFO scheduler.ReceiverTracker: Stream 0 received 8 blocks
14/07/02 16:54:00 INFO scheduler.JobScheduler: Added jobs for time
1404341640000 ms
14/07/02 16:54:10 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
14/07/02 16:54:10 INFO scheduler.JobScheduler: Added jobs for time
1404341650000 ms




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-Not-Being-Called-by-Spark-Streaming-tp8684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to