Hello all, Can we invoke JavaRDD while processing stream from Kafka for example. Following code is throwing some serialization exception. Not sure if this is feasible.
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5)); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, group, topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaPairDStream<String, String> wordCounts = lines.mapToPair( new PairFunction<String, String, String>() { public Tuple2<String, String> call(String urlString) { String propertiesFile = "/home/cloudera/Desktop/sample/input/featurelist.properties"; JavaRDD<String> propertiesFileRDD = jsc.textFile(propertiesFile); JavaPairRDD<String, String> featureKeyClassPair = propertiesFileRDD.mapToPair( new PairFunction<String, String, String>() { public Tuple2<String, String> call(String property) { return new Tuple2(property.split("=")[0], property.split("=")[1]); } }); featureKeyClassPair.count(); return new Tuple2<String, String>(urlString, featureScore); } });