Re: Double Counting When Using Accumulators with Spark Streaming
gt; mapreduce.job.output.key.class > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.output.value.class is deprecated. Instead, use > mapreduce.job.output.value.class > > INFO : org.apache.hadoop.conf.Configuration.deprecation - > mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir > > INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, > partition 0 offsets 36 -> 37 > > INFO : kafka.utils.VerifiableProperties - Verifying properties > > INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes > is overridden to 1073741824 > > INFO : kafka.utils.VerifiableProperties - Property group.id is overridden > to > > INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is > overridden to localhost:2181 > > INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - # > Input json stream data # one test message > > INFO : org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved > output of task 'attempt_201601050824_0001_m_00_1' to > hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text/_temporary/0/task_201601050824_0001_m_00 > > INFO : org.apache.spark.mapred.SparkHadoopMapRedUtil - > attempt_201601050824_0001_m_00_1: Committed > > INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 > (TID 1). 933 bytes result sent to driver > > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD > at KafkaURLStreaming.java:90) finished in 0.758 s > > INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: > foreachRDD at KafkaURLStreaming.java:90, took 0.888585 s > > INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in > stage 1.0 (TID 1) in 760 ms on localhost (1/1) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, > whose tasks have all completed, from pool > > &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 2 > > > > *But if I comment the saveAsText then I am getting correct count as one > for each input.* > > > > INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2227) called > with curMem=9937, maxMem=1893865881 > > INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 > stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB) > > INFO : org.apache.spark.storage.BlockManagerInfo - Added > broadcast_1_piece0 in memory on localhost:58397 (size: 2.2 KB, free: 1806.1 > MB) > > INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast > at DAGScheduler.scala:861 > > INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing > tasks from ResultStage 1 (MapPartitionsRDD[3] at map at > KafkaURLStreaming.java:83) > > INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 > with 1 tasks > > INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in > stage 1.0 (TID 1, localhost, ANY, 2026 bytes) > > INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 > (TID 1) > > INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, > partition 0 offsets 37 -> 38 > > INFO : kafka.utils.VerifiableProperties - Verifying properties > > INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes > is overridden to 1073741824 > > INFO : kafka.utils.VerifiableProperties - Property group.id is overridden > to > > INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is > overridden to localhost:2181 > > INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - # > Input json stream data # one test message without saveAs > > INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 > (TID 1). 987 bytes result sent to driver > > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD > at KafkaURLStreaming.java:90) finished in 0.103 s > > INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: > foreachRDD at KafkaURLStreaming.java:90, took 0.151210 s > > &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1 > > > > > > -Original Message- > From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] > Sent: Tuesday, January 05, 2016 8:21 AM > To: user@spark.apache.org > Subject: Re: Double Counting When Using Accumulators with Spark Streaming > > > > Hi Rachana, > > > > don't you
RE: Double Counting When Using Accumulators with Spark Streaming
ree 1806.1 MB) INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in memory on localhost:58397 (size: 2.2 KB, free: 1806.1 MB) INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at DAGScheduler.scala:861 INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at KafkaURLStreaming.java:83) INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 1 tasks INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 1.0 (TID 1, localhost, ANY, 2026 bytes) INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 1) INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, partition 0 offsets 37 -> 38 INFO : kafka.utils.VerifiableProperties - Verifying properties INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is overridden to 1073741824 INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to localhost:2181 INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - # Input json stream data # one test message without saveAs INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 1). 987 bytes result sent to driver INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at KafkaURLStreaming.java:90) finished in 0.103 s INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at KafkaURLStreaming.java:90, took 0.151210 s &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1 -Original Message- From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] Sent: Tuesday, January 05, 2016 8:21 AM To: user@spark.apache.org Subject: Re: Double Counting When Using Accumulators with Spark Streaming Hi Rachana, don't you have two messages on the kafka broker ? Regards JB On 01/05/2016 05:14 PM, Rachana Srivastava wrote: > I have a very simple two lines program. I am getting input from Kafka > and save the input in a file and counting the input received. My code > looks like this, when I run this code I am getting two accumulator > count for each input. > > HashMap kafkaParams= *new*HashMap String>();kafkaParams.put("metadata.broker.list", "localhost:9092"); > kafkaParams.put("zookeeper.connect", "localhost:2181"); > > JavaPairInputDStream messages= > KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*, > StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); > > *final**Accumulator **accum**= > **jssc**.sparkContext().accumulator(0);*** > > JavaDStream lines= messages.map( > > *new*_Function, String>()_ { > > *public*String call(Tuple2 tuple2) { *accum.add(1);* > *return*tuple2._2(); > > }}); > > lines.foreachRDD(*new*_Function, Void>()_ { > > *public*Void call(JavaRDD rdd) *throws*Exception { > > *if*(!rdd.isEmpty() || > !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo > udera:8020/user/cloudera/testDirJan4/test1.text");} > > System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS > "+ *accum.value(*)); *return**null*;} > > }); > > jssc.start(); > > If I comment rdd.saveAsTextFile I get correct count, but with > rdd.saveAsTextFile for each input I am getting multiple accumulator count. > > Thanks, > > Rachana > -- Jean-Baptiste Onofré jbono...@apache.org<mailto:jbono...@apache.org> http://blog.nanthrax.net Talend - http://www.talend.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: Double Counting When Using Accumulators with Spark Streaming
Hi Rachana, don't you have two messages on the kafka broker ? Regards JB On 01/05/2016 05:14 PM, Rachana Srivastava wrote: I have a very simple two lines program. I am getting input from Kafka and save the input in a file and counting the input received. My code looks like this, when I run this code I am getting two accumulator count for each input. HashMap kafkaParams= *new*HashMap();kafkaParams.put("metadata.broker.list", "localhost:9092"); kafkaParams.put("zookeeper.connect", "localhost:2181"); JavaPairInputDStream messages= KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*, StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); *final**Accumulator **accum**= **jssc**.sparkContext().accumulator(0);*** JavaDStream lines= messages.map( *new*_Function, String>()_ { *public*String call(Tuple2 tuple2) { *accum.add(1);* *return*tuple2._2(); }}); lines.foreachRDD(*new*_Function, Void>()_ { *public*Void call(JavaRDD rdd) *throws*Exception { *if*(!rdd.isEmpty() || !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");} System.*/out/*.println(" & COUNT OF ACCUMULATOR IS "+ *accum.value(*)); *return**null*;} }); jssc.start(); If I comment rdd.saveAsTextFile I get correct count, but with rdd.saveAsTextFile for each input I am getting multiple accumulator count. Thanks, Rachana -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Double Counting When Using Accumulators with Spark Streaming
I have a very simple two lines program. I am getting input from Kafka and save the input in a file and counting the input received. My code looks like this, when I run this code I am getting two accumulator count for each input. HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list","localhost:9092"); kafkaParams.put("zookeeper.connect", "localhost:2181"); JavaPairInputDStream messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); final Accumulator accum = jssc.sparkContext().accumulator(0); JavaDStream lines = messages.map( new Function, String>() { public String call(Tuple2 tuple2) { accum.add(1); return tuple2._2(); } }); lines.foreachRDD(new Function, Void>() { public Void call(JavaRDD rdd) throws Exception { if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");} System.out.println(" & COUNT OF ACCUMULATOR IS " + accum.value()); return null;} }); jssc.start(); If I comment rdd.saveAsTextFile I get correct count, but with rdd.saveAsTextFile for each input I am getting multiple accumulator count. Thanks, Rachana