Hi Rachana, don't you have two messages on the kafka broker ?
Regards JB On 01/05/2016 05:14 PM, Rachana Srivastava wrote:
I have a very simple two lines program. I am getting input from Kafka and save the input in a file and counting the input received. My code looks like this, when I run this code I am getting two accumulator count for each input. HashMap<String, String> kafkaParams= *new*HashMap<String, String>();kafkaParams.put("metadata.broker.list", "localhost:9092"); kafkaParams.put("zookeeper.connect", "localhost:2181"); JavaPairInputDStream<String, String> messages= KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*, StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); *final**Accumulator<Integer> **accum**= **jssc**.sparkContext().accumulator(0);*** JavaDStream<String> lines= messages.map( *new*_Function<Tuple2<String, String>, String>()_ { *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);* *return*tuple2._2(); }}); lines.foreachRDD(*new*_Function<JavaRDD<String>, Void>()_ { *public*Void call(JavaRDD<String> rdd) *throws*Exception { *if*(!rdd.isEmpty() || !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");} System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS "+ *accum.value(*)); *return**null*;} }); jssc.start(); If I comment rdd.saveAsTextFile I get correct count, but with rdd.saveAsTextFile for each input I am getting multiple accumulator count. Thanks, Rachana
-- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org