Hi, I added checkpoint directory and now Using updateStateByKey()
import com.google.common.base.Optional; Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } }; JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction); But I didn't get what exactly I should assign in Integer newSum = ... // add the new values with the previous running count to get the new count Thanks and regards Shweta Jadhav -----VISHNU SUBRAMANIAN <johnfedrickena...@gmail.com> wrote: ----- To: Jadhav Shweta <jadhav.shw...@tcs.com> From: VISHNU SUBRAMANIAN <johnfedrickena...@gmail.com> Date: 02/02/2015 04:39PM Cc: "user@spark.apache.org" <user@spark.apache.org> Subject: Re: Java Kafka Word Count Issue You can use updateStateByKey() to perform the above operation. On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta <jadhav.shw...@tcs.com> wrote: Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -----Sean Owen <so...@cloudera.com> wrote: ----- To: Jadhav Shweta <jadhav.shw...@tcs.com> From: Sean Owen <so...@cloudera.com> Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, "Jadhav Shweta" <jadhav.shw...@tcs.com> wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster("local[*]") and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =====-----=====-----===== Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you