Hi,
I added checkpoint directory and now Using updateStateByKey()
import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running
count to get the new count
return Optional.of(newSum);
}
};
JavaPairDStream<String, Integer> runningCounts =
pairs.updateStateByKey(updateFunction);
But I didn't get what exactly I should assign in Integer newSum = ... // add
the new values with the previous running count to get the new count
Thanks and regards
Shweta Jadhav
-----VISHNU SUBRAMANIAN <[email protected]> wrote: -----
To: Jadhav Shweta <[email protected]>
From: VISHNU SUBRAMANIAN <[email protected]>
Date: 02/02/2015 04:39PM
Cc: "[email protected]" <[email protected]>
Subject: Re: Java Kafka Word Count Issue
You can use updateStateByKey() to perform the above operation.
On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta <[email protected]> wrote:
Hi Sean,
Kafka Producer is working fine.
This is related to Spark.
How can i configure spark so that it will make sure to remember count from the
beginning.
If my log.text file has
spark
apache
kafka
spark
My Spark program gives correct output as
spark 2
apache 1
kafka 1
but when I append spark to my log.text file
Spark program gives output as
spark 1
which should be spark 3.
So how to handle this in Spark code.
Thanks and regards
Shweta Jadhav
-----Sean Owen <[email protected]> wrote: -----
To: Jadhav Shweta <[email protected]>
From: Sean Owen <[email protected]>
Date: 02/02/2015 04:13PM
Subject: Re: Java Kafka Word Count Issue
This is a question about the Kafka producer right? Not Spark
On Feb 2, 2015 10:34 AM, "Jadhav Shweta" <[email protected]> wrote:
Hi All,
I am trying to run Kafka Word Count Program.
please find below, the link for the same
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
I have set spark master to setMaster("local[*]")
and I have started Kafka Producer which reads the file.
If my file has already few words
then after running Spark java program I get proper output.
But when i append new words in same file it starts word count again from 1.
If I need to do word count for already present and newly appended words exactly
what changes I need to make in code for that.
P.S. I am using Spark spark-1.2.0-bin-hadoop2.3
Thanks and regards
Shweta Jadhav
=====-----=====-----=====
Notice: The information contained in this e-mail
message and/or attachments to it may contain
confidential or privileged information. If you are
not the intended recipient, any dissemination, use,
review, distribution, printing or copying of the
information contained in this e-mail message
and/or attachments to it are strictly prohibited. If
you have received this communication in error,
please notify us by reply e-mail or telephone and
immediately and permanently delete the message
and any attachments. Thank you