Thanks a lot for your prompt response.  I am pushing one message.

HashMap<String, String> kafkaParams = new HashMap<String, String>();  
kafkaParams.put("metadata.broker.list",    "localhost:9092");   
kafkaParams.put("zookeeper.connect", "localhost:2181");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( 
jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, 
kafkaParams, topicsSet);
final Accumulator<Integer> accum = jssc.sparkContext().accumulator(0);
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
               public String call(Tuple2<String, String> tuple2) { 
LOG.info("#################  Input json stream data  ################# " +  
tuple2._2);accum.add(1); return tuple2._2();
} });
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ 
rdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
System.out.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS " + 
accum.value()); return null;}
 });
 jssc.start();

If I remove this saveAsTextFile I get correct count with this line I am getting 
double counting.



Here are the Stack trace with SaveAsText statement Please see double counting 
below:



&&&&&&&&&&&&&&&&&&&&&&& BEFORE COUNT OF ACCUMULATOR IS &&&&&&&&&&&&&&& 0

INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at 
KafkaURLStreaming.java:90

INFO : org.apache.spark.scheduler.DAGScheduler - Got job 0 (foreachRDD at 
KafkaURLStreaming.java:90) with 1 output partitions

INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 
0(foreachRDD at KafkaURLStreaming.java:90)

INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()

INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List()

INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83), which has no missing 
parents

INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(3856) called with 
curMem=0, maxMem=1893865881

INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0 stored as 
values in memory (estimated size 3.8 KB, free 1806.1 MB)

INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2225) called with 
curMem=3856, maxMem=1893865881

INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0_piece0 stored 
as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)

INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in 
memory on localhost:51637 (size: 2.2 KB, free: 1806.1 MB)

INFO : org.apache.spark.SparkContext - Created broadcast 0 from broadcast at 
DAGScheduler.scala:861

INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks 
from ResultStage 0 (MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83)

INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 
1 tasks

INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 
0.0 (TID 0, localhost, ANY, 2026 bytes)

INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 
0)

INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, 
partition 0 offsets 36 -> 37

INFO : kafka.utils.VerifiableProperties - Verifying properties

INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is 
overridden to 1073741824

INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to

INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is 
overridden to localhost:2181

INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #################  
Input json stream data  ################# one test message

INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID 
0). 972 bytes result sent to driver

INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (foreachRDD at 
KafkaURLStreaming.java:90) finished in 0.133 s

INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 
0.0 (TID 0) in 116 ms on localhost (1/1)

INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, 
whose tasks have all completed, from pool

INFO : org.apache.spark.scheduler.DAGScheduler - Job 0 finished: foreachRDD at 
KafkaURLStreaming.java:90, took 0.496657 s

INFO : org.apache.spark.ContextCleaner - Cleaned accumulator 2

INFO : org.apache.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 
on localhost:51637 in memory (size: 2.2 KB, free: 1806.1 MB)

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.tip.id is 
deprecated. Instead, use mapreduce.task.id

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.id is 
deprecated. Instead, use mapreduce.task.attempt.id

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.is.map is 
deprecated. Instead, use mapreduce.task.ismap

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.task.partition 
is deprecated. Instead, use mapreduce.task.partition

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.job.id is 
deprecated. Instead, use mapreduce.job.id

INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at 
KafkaURLStreaming.java:90

INFO : org.apache.spark.scheduler.DAGScheduler - Got job 1 (foreachRDD at 
KafkaURLStreaming.java:90) with 1 output partitions

INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 
1(foreachRDD at KafkaURLStreaming.java:90)

INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()

INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List()

INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 1 
(MapPartitionsRDD[2] at foreachRDD at KafkaURLStreaming.java:90), which has no 
missing parents

INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(97104) called 
with curMem=0, maxMem=1893865881

INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1 stored as 
values in memory (estimated size 94.8 KB, free 1806.0 MB)

INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(32204) called 
with curMem=97104, maxMem=1893865881

INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored 
as bytes in memory (estimated size 31.4 KB, free 1806.0 MB)

INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in 
memory on localhost:51637 (size: 31.4 KB, free: 1806.1 MB)

INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at 
DAGScheduler.scala:861

INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks 
from ResultStage 1 (MapPartitionsRDD[2] at foreachRDD at 
KafkaURLStreaming.java:90)

INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 
1 tasks

INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 
1.0 (TID 1, localhost, ANY, 2026 bytes)

INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 
1)

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.output.dir is 
deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir

INFO : org.apache.hadoop.conf.Configuration.deprecation - 
mapred.output.key.class is deprecated. Instead, use 
mapreduce.job.output.key.class

INFO : org.apache.hadoop.conf.Configuration.deprecation - 
mapred.output.value.class is deprecated. Instead, use 
mapreduce.job.output.value.class

INFO : org.apache.hadoop.conf.Configuration.deprecation - mapred.working.dir is 
deprecated. Instead, use mapreduce.job.working.dir

INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, 
partition 0 offsets 36 -> 37

INFO : kafka.utils.VerifiableProperties - Verifying properties

INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is 
overridden to 1073741824

INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to

INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is 
overridden to localhost:2181

INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #################  
Input json stream data  ################# one test message

INFO : org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved 
output of task 'attempt_201601050824_0001_m_000000_1' to 
hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text/_temporary/0/task_201601050824_0001_m_000000

INFO : org.apache.spark.mapred.SparkHadoopMapRedUtil - 
attempt_201601050824_0001_m_000000_1: Committed

INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 
1). 933 bytes result sent to driver

INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at 
KafkaURLStreaming.java:90) finished in 0.758 s

INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at 
KafkaURLStreaming.java:90, took 0.888585 s

INFO : org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 
1.0 (TID 1) in 760 ms on localhost (1/1)

INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, 
whose tasks have all completed, from pool

 &&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 2



But if I comment the saveAsText then I am getting correct count as one for each 
input.


INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2227) called with 
curMem=9937, maxMem=1893865881
INFO : org.apache.spark.storage.MemoryStore - Block broadcast_1_piece0 stored 
as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
INFO : org.apache.spark.storage.BlockManagerInfo - Added broadcast_1_piece0 in 
memory on localhost:58397 (size: 2.2 KB, free: 1806.1 MB)
INFO : org.apache.spark.SparkContext - Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks 
from ResultStage 1 (MapPartitionsRDD[3] at map at KafkaURLStreaming.java:83)
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 1.0 with 
1 tasks
INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 
1.0 (TID 1, localhost, ANY, 2026 bytes)
INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 1.0 (TID 
1)
INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11, 
partition 0 offsets 37 -> 38
INFO : kafka.utils.VerifiableProperties - Verifying properties
INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes is 
overridden to 1073741824
INFO : kafka.utils.VerifiableProperties - Property group.id is overridden to
INFO : kafka.utils.VerifiableProperties - Property zookeeper.connect is 
overridden to localhost:2181
INFO : com.markmonitor.antifraud.ce.KafkaURLStreaming - #################  
Input json stream data  ################# one test message without saveAs
INFO : org.apache.spark.executor.Executor - Finished task 0.0 in stage 1.0 (TID 
1). 987 bytes result sent to driver
INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (foreachRDD at 
KafkaURLStreaming.java:90) finished in 0.103 s
INFO : org.apache.spark.scheduler.DAGScheduler - Job 1 finished: foreachRDD at 
KafkaURLStreaming.java:90, took 0.151210 s

&&&&&&&&&&&&&&&&&&&&& AFTER COUNT OF ACCUMULATOR IS 1





-----Original Message-----
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net]
Sent: Tuesday, January 05, 2016 8:21 AM
To: user@spark.apache.org
Subject: Re: Double Counting When Using Accumulators with Spark Streaming



Hi Rachana,



don't you have two messages on the kafka broker ?



Regards

JB



On 01/05/2016 05:14 PM, Rachana Srivastava wrote:

> I have a very simple two lines program.  I am getting input from Kafka

> and save the input in a file and counting the input received.  My code

> looks like this, when I run this code I am getting two accumulator

> count for each input.

>

> HashMap<String, String> kafkaParams= *new*HashMap<String,

> String>();kafkaParams.put("metadata.broker.list", "localhost:9092");

> kafkaParams.put("zookeeper.connect", "localhost:2181");

>

> JavaPairInputDStream<String, String> messages=

> KafkaUtils./createDirectStream/( jssc,String.*class*, String.*class*,

> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);

>

> *final**Accumulator<Integer> **accum**=

> **jssc**.sparkContext().accumulator(0);***

>

> JavaDStream<String> lines= messages.map(

>

> *new*_Function<Tuple2<String, String>, String>()_ {

>

> *public*String call(Tuple2<String, String> tuple2) { *accum.add(1);*

> *return*tuple2._2();

>

> }});

>

> lines.foreachRDD(*new*_Function<JavaRDD<String>, Void>()_ {

>

> *public*Void call(JavaRDD<String> rdd) *throws*Exception {

>

> *if*(!rdd.isEmpty() ||

> !rdd.partitions().isEmpty()){rdd.saveAsTextFile("hdfs://quickstart.clo

> udera:8020/user/cloudera/testDirJan4/test1.text");}

>

> System.*/out/*.println(" &&&&&&&&&&&&&&&&&&&&& COUNT OF ACCUMULATOR IS

> "+ *accum.value(*)); *return**null*;}

>

>   });

>

> jssc.start();

>

> If I comment rdd.saveAsTextFile I get correct count, but with

> rdd.saveAsTextFile for each input I am getting multiple accumulator count.

>

> Thanks,

>

> Rachana

>



--

Jean-Baptiste Onofré

jbono...@apache.org<mailto:jbono...@apache.org>

http://blog.nanthrax.net

Talend - http://www.talend.com



---------------------------------------------------------------------

To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For 
additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


Reply via email to