Thanks a lot for your prompt response. I am pushing one message.
HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "localhost:9092"); kafkaParams.put("zookeeper.connect", "localhost:2181"); JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0); JavaDStream<String> lines = messages.map( new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { LOG.info("################# Input json stream data ################# " + tuple2._2);accum.add(1); return tuple2._2(); } }); lines.foreachRDD(new Function<JavaRDD<String>, Void>() { public Void call(JavaRDD<String> rdd) throws Exception { if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");} System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " + accum.value()); return null;} }); jssc.start(); If I remove this saveAsTextFile I get correct count with this line I am getting double counting. Here are the Stack trace with SaveAsText statement Please see double counting below: &&&&&&&&&&&&&&&&&&&&&&& BEFORE COUNT OF ACCUMULATOR IS &&&&&&&&&&&&&&& 0 INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at KafkaURLStreaming.java:90 INFO : org.apache.spark.scheduler.DAGScheduler - Got job 0 (foreachRDD at KafkaURLStreaming.java:90) with 1 output partitions INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 0(foreachRDD at KafkaURLStreaming.java:90) INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List() INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 0 (MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83), which has no missing parents INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(3856) called with curMem=0, maxMem=1893865881 INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0 stored as values in memory (estimated size 3.8 KB, free 1806.1 MB) INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2225) called with curMem=3856, maxMem=1893865881 INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB) INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory on localhost:51637 (size: 2.2 KB, free: 1806.1 MB) INFO : org.apache.spark.SparkContext - Created broadcast 0 from broadcast at DAGScheduler.scala:861 INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83) INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 1 tasks INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 2026 bytes) INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0) INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, partition 0 offsets 36 -> 37 INFO : kafka.utils.VerifiableProperties - Verifying properties INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is overridden to 1073741824 INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to localhost:2181 INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - ################# Input json stream data ################# one test message INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID 0). 972 bytes result sent to driver INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (foreachRDD at KafkaURLStreaming.java:90) finished in 0.133 s INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0 (TID 0) in 116 ms on localhost (1/1) INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool INFO : org.apache.spark.scheduler.DAGScheduler - Job 0 finished: foreachRDD at KafkaURLStreaming.java:90, took 0.496657 s INFO : org.apache.spark.ContextCleaner - Cleaned accumulator 2 INFO : org.apache.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on localhost:51637 in memory (size: 2.2 KB, free: 1806.1 MB) INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.tip.id is deprecated. Instead, use mapreduce.task.id INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.partition is deprecated. Instead, use mapreduce.task.partition INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.job.id is deprecated. Instead, use mapreduce.job.id INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at KafkaURLStreaming.java:90 INFO : org.apache.spark.scheduler.DAGScheduler - Got job 1 (foreachRDD at KafkaURLStreaming.java:90) with 1 output partitions INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 1(foreachRDD at KafkaURLStreaming.java:90) INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List() INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List() INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 1 (MapPartitionsRDD[2] at foreachRDD at KafkaURLStreaming.java:90), which has no missing parents INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(97104) called with curMem=0, maxMem=1893865881 INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as values in memory (estimated size 94.8 KB, free 1806.0 MB) INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(32204) called with curMem=97104, maxMem=1893865881 INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 31.4 KB, free 1806.0 MB) INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:51637 (size: 31.4 KB, free: 1806.1 MB) INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:861 INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[2] at foreachRDD at KafkaURLStreaming.java:90) INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, ANY, 2026 bytes) INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1) INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, partition 0 offsets 36 -> 37 INFO : kafka.utils.VerifiableProperties - Verifying properties INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is overridden to 1073741824 INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to localhost:2181 INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - ################# Input json stream data ################# one test message INFO : org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_201601050824_0001_m_000000_1' to hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text/_temporary/0/task_201601050824_0001_m_000000 INFO : org.apache.spark.mapred.SparkHadoopMapRedUtil - attempt_201601050824_0001_m_000000_1: Committed INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 933 bytes result sent to driver INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at KafkaURLStreaming.java:90) finished in 0.758 s INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at KafkaURLStreaming.java:90, took 0.888585 s INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 1.0 (TID 1) in 760 ms on localhost (1/1) INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 2 But if I comment the saveAsText then I am getting correct count as one for each input. INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2227) called with curMem=9937, maxMem=1893865881 INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB) INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:58397 (size: 2.2 KB, free: 1806.1 MB) INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:861 INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at KafkaURLStreaming.java:83) INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, ANY, 2026 bytes) INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1) INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, partition 0 offsets 37 -> 38 INFO : kafka.utils.VerifiableProperties - Verifying properties INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is overridden to 1073741824 INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to localhost:2181 INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - ################# Input json stream data ################# one test message without saveAs INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 987 bytes result sent to driver INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at KafkaURLStreaming.java:90) finished in 0.103 s INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at KafkaURLStreaming.java:90, took 0.151210 s &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1 -----Original Message----- From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] Sent: Tuesday, January 05, 2016 8:21 AM To: user@spark.apache.org Subject: Re: Double Counting When Using Accumulators with Spark Streaming Hi Rachana, don't you have two messages on the kafka broker ? Regards JB On 01/05/2016 05:14 PM, Rachana Srivastava wrote: > I have a very simple two lines program. I am getting input from Kafka > and save the input in a file and counting the input received. My code > looks like this, when I run this code I am getting two accumulator > count for each input. > > HashMap<String, String> kafkaParams= *new*HashMap<String, > String>();kafkaParams.put("metadata.broker.list", "localhost:9092"); > kafkaParams.put("zookeeper.connect", "localhost:2181"); > > JavaPairInputDStream<String, String> messages= > KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*, > StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); > > *final**Accumulator<Integer> **accum**= > **jssc**.sparkContext().accumulator(0);*** > > JavaDStream<String> lines= messages.map( > > *new*_Function<Tuple2<String, String>, String>()_ { > > *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);* > *return*tuple2._2(); > > }}); > > lines.foreachRDD(*new*_Function<JavaRDD<String>, Void>()_ { > > *public*Void call(JavaRDD<String> rdd) *throws*Exception { > > *if*(!rdd.isEmpty() || > !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo > udera:8020/user/cloudera/testDirJan4/test1.text");} > > System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS > "+ *accum.value(*)); *return**null*;} > > }); > > jssc.start(); > > If I comment rdd.saveAsTextFile I get correct count, but with > rdd.saveAsTextFile for each input I am getting multiple accumulator count. > > Thanks, > > Rachana > -- Jean-Baptiste Onofré jbono...@apache.org<mailto:jbono...@apache.org> http://blog.nanthrax.net Talend - http://www.talend.com --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>