Hi Rachana,

don't you have two messages on the kafka broker ?

Regards
JB

On 01/05/2016 05:14 PM, Rachana Srivastava wrote:
I have a very simple two lines program.  I am getting input from Kafka
and save the input in a file and counting the input received.  My code
looks like this, when I run this code I am getting two accumulator count
for each input.

HashMap<String, String> kafkaParams= *new*HashMap<String,
String>();kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("zookeeper.connect", "localhost:2181");

JavaPairInputDStream<String, String> messages=
KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*,
StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);

*final**Accumulator<Integer> **accum**=
**jssc**.sparkContext().accumulator(0);***

JavaDStream<String> lines= messages.map(

*new*_Function<Tuple2<String, String>, String>()_ {

*public*String call(Tuple2<String, String> tuple2) { *accum.add(1);*
*return*tuple2._2();

}});

lines.foreachRDD(*new*_Function<JavaRDD<String>, Void>()_ {

*public*Void call(JavaRDD<String> rdd) *throws*Exception {

*if*(!rdd.isEmpty() ||
!rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}

System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS
"+ *accum.value(*)); *return**null*;}

  });

jssc.start();

If I comment rdd.saveAsTextFile I get correct count, but with
rdd.saveAsTextFile for each input I am getting multiple accumulator count.

Thanks,

Rachana


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to