I have a very simple two lines program. I am getting input from Kafka and save
the input in a file and counting the input received. My code looks like this,
when I run this code I am getting two accumulator count for each input.
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("zookeeper.connect", "localhost:2181");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);
final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
accum.add(1); return tuple2._2();
} });
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){
rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " +
accum.value()); return null;}
});
jssc.start();
If I comment rdd.saveAsTextFile I get correct count, but with
rdd.saveAsTextFile for each input I am getting multiple accumulator count.
Thanks,
Rachana