Just increase the retention time so the window is not dropped and can
accept later arriving data.
About your example: retention time specified via until() is a minimum
retention time! It can happen, that a window is kept longer.
-Matthias
On 12/12/16 11:49 PM, Sachin Mittal wrote:
> Hi,
> Well
Hi Sachin,
Note that "until" means that the window will be retained for that period of
time after the window starting time. So when you set the time to 1 year, if
there is a message whose timestamp is 1 year + 1 sec beyond the "current
stream time", then yes it will cause the window to be
Hi,
Right now in order to circumvent this problem I am using a timestamp whose
values increase by few ms as and when I get new records.
So lets say I have records in order
A -> lower limit TS + 1 sec
B -> lower limit TS + 3 sec
C -> lower limit TS + 5 sec
..
Z -> upper limit TS - 1 sec
Now say I
Hello Sachin,
I am with you that ideally the windowing segmentation implementation should
be totally abstracted from users but today it is a bit confusing to
understand. I have filed JIRA some time ago to improve on this end:
https://issues.apache.org/jira/browse/KAFKA-3596
So to your example,
wangg...@gmail.com>
> Date: 12/2/16 5:48 PM (GMT-06:00) To: users@kafka.apache.org Subject: Re:
> Kafka windowed table not aggregating correctly
> Sachin,
>
> One thing to note is that the retention of the windowed stores works by
> keeping multiple segments of the stores wh
Unsubscribe
Sent via the Samsung Galaxy S7, an AT 4G LTE smartphone
Original message From: Guozhang Wang <wangg...@gmail.com>
Date: 12/2/16 5:48 PM (GMT-06:00) To: users@kafka.apache.org Subject: Re:
Kafka windowed table not aggregating correctly
Sachin,
One thing t
Hi,
I think now it makes all the sense. The field I was using for timestamp
extractor contains timestamps which spans for greater than a day's duration
and it worked for wall clock because for short duration timestamps were in
day's range.
I wanted to understand one thing:
Say I have a timestamp
Sachin,
One thing to note is that the retention of the windowed stores works by
keeping multiple segments of the stores where each segments stores a time
range which can potentially span multiple windows, if a new window needs to
be created that is further from the oldest segment's time range +
The extractor is used in
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
Let us know, if you could resolve the problem or need more help.
-Matthias
On 12/2/16 11:46 AM, Sachin Mittal wrote:
> https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
>
https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
using. The version is 0.5x. Can you please tell me what code in streams
calls the timestamp extractor. I can look there to see if there is any
issue.
Again issue happens only when producing the messages using producer that is
I am not sure what is happening. That's why it would be good to have a
toy example to reproduce the issue.
What do you mean by "Kafka node version 0.5"?
-Matthias
On 12/2/16 11:30 AM, Sachin Mittal wrote:
> I can provide with the data but data does not seem to be the issue.
> If I submit the
I can provide with the data but data does not seem to be the issue.
If I submit the same data and use same timestamp extractor using the java
client with kafka version 0.10.0.1 aggregation works fine.
I find the issue only when submitting the data with kafka node version 0.5.
It looks like the
Can you provide example input data (including timetamps) and result.
What is the expected result (ie, what aggregation do you apply)?
-Matthias
On 12/2/16 7:43 AM, Sachin Mittal wrote:
> Hi,
> After much debugging I found an issue with timestamp extractor.
>
> If I use a custom timestamp
Hi,
After much debugging I found an issue with timestamp extractor.
If I use a custom timestamp extractor with following code:
public static class MessageTimestampExtractor implements
TimestampExtractor {
public long extract(ConsumerRecord
Sachin,
This is indeed a bit wired, and we'd like to try to re-produce your issue
locally. Do you have a sample input data for us to try out?
Guozhang
On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal wrote:
> Hi,
> I fixed that sorted set issue but I am facing a weird
Hi,
I fixed that sorted set issue but I am facing a weird problem which I am
not able to replicate.
Here is the sample problem that I could isolate:
My class is like this:
public static class Message implements Comparable {
public long ts;
public String message;
public
Hello Sachin,
In the implementation of SortedSet, if the object's implemented the
Comparable interface, that compareTo function is applied in "
aggregate.add(value);", and hence if it returns 0, this element will not be
added since it is a Set.
Guozhang
On Mon, Nov 21, 2016 at 10:06 PM,
Hi,
What I find is that when I use sorted set as aggregation it fails to
aggregate the values which have compareTo returning 0.
My class is like this:
public class Message implements Comparable {
public long ts;
public String message;
public Message() {};
On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal wrote:
> I am using kafka_2.10-0.10.0.1.
> Say I am having a window of 60 minutes advanced by 15 minutes.
> If the stream app using timestamp extractor puts the message in one or more
> bucket(s), it will get aggregated in those
I am using kafka_2.10-0.10.0.1.
Say I am having a window of 60 minutes advanced by 15 minutes.
If the stream app using timestamp extractor puts the message in one or more
bucket(s), it will get aggregated in those buckets.
I assume this statement is correct.
Also say when I restart the streams
Hello Sachin,
Which version of Kafka are you using for this application?
Guozhang
On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal wrote:
> Hi,
> I have a simple pipeline
> stream.aggregateByKey(new Initializer() {
> public List apply() {
> return new List
> }
Hi,
I have a simple pipeline
stream.aggregateByKey(new Initializer() {
public List apply() {
return new List
}
}, new Aggregator() {
public List apply(key, value, list) {
list.add(value)
return list
}
}, keysSerde, valuesSerde, "table")
So this