Hello all,

Can we invoke JavaRDD while processing stream from Kafka for example.  
Following code is throwing some serialization exception.  Not sure if this is 
feasible.

  JavaStreamingContext jssc = new JavaStreamingContext(jsc, 
Durations.seconds(5));
    JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
      public String call(Tuple2<String, String> tuple2) { return tuple2._2();
      }
    });
    JavaPairDStream<String, String> wordCounts = lines.mapToPair( new 
PairFunction<String, String, String>() {
        public Tuple2<String, String> call(String urlString) {
                        String propertiesFile = 
"/home/cloudera/Desktop/sample/input/featurelist.properties";
                        JavaRDD<String> propertiesFileRDD = 
jsc.textFile(propertiesFile);
                          JavaPairRDD<String, String> featureKeyClassPair = 
propertiesFileRDD.mapToPair(
                                      new PairFunction<String, String, 
String>() {
                                                  public Tuple2<String, String> 
call(String property) {
                                                    return new 
Tuple2(property.split("=")[0], property.split("=")[1]);
                                                  }
                                     });
                                    featureKeyClassPair.count();
          return new Tuple2<String, String>(urlString,  featureScore);
        }
      });

Reply via email to