Hi,

I added checkpoint directory and now Using updateStateByKey()

import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
    @Override public Optional<Integer> call(List<Integer> values, 
Optional<Integer> state) {
      Integer newSum = ...  // add the new values with the previous running 
count to get the new count
      return Optional.of(newSum);
    }
  };
JavaPairDStream<String, Integer> runningCounts = 
pairs.updateStateByKey(updateFunction);

But I didn't get what exactly I should assign in Integer newSum = ...  // add 
the new values with the previous running count to get the new count


Thanks and regards
Shweta Jadhav



-----VISHNU SUBRAMANIAN <johnfedrickena...@gmail.com> wrote: -----
To: Jadhav Shweta <jadhav.shw...@tcs.com>
From: VISHNU SUBRAMANIAN <johnfedrickena...@gmail.com>
Date: 02/02/2015 04:39PM
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Java Kafka Word Count Issue

You can use updateStateByKey() to perform the above operation.

On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta <jadhav.shw...@tcs.com> wrote:

Hi Sean,

Kafka Producer is working fine.
This is related to Spark.

How can i configure spark so that it will make sure to remember count from the 
beginning.

If my log.text file has

spark
apache
kafka
spark

My Spark program gives correct output as 

spark 2
apache 1
kafka 1

but when I append spark to my log.text file

Spark program gives output as

spark 1

which should be spark 3.

So how to handle this in Spark code.

Thanks and regards 
Shweta Jadhav



-----Sean Owen <so...@cloudera.com> wrote: -----
To: Jadhav Shweta <jadhav.shw...@tcs.com>
From: Sean Owen <so...@cloudera.com>
Date: 02/02/2015 04:13PM
Subject: Re: Java Kafka Word Count Issue

This is a question about the Kafka producer right? Not Spark

On Feb 2, 2015 10:34 AM, "Jadhav Shweta" <jadhav.shw...@tcs.com> wrote:

Hi All,

I am trying to run Kafka Word Count Program.
please find below, the link for the same
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java

I have set spark master to setMaster("local[*]")

and I have started Kafka Producer which reads the file.

If my file has already few words
then after running Spark java program I get proper output.

But when i append new words in same file it starts word count again from 1.

If I need to do word count for already present and newly appended words exactly 
what changes I need to make in code for that.

P.S. I am using Spark spark-1.2.0-bin-hadoop2.3

Thanks and regards 
Shweta Jadhav
=====-----=====-----=====
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you


Reply via email to