Sachin, One thing to note is that the retention of the windowed stores works by keeping multiple segments of the stores where each segments stores a time range which can potentially span multiple windows, if a new window needs to be created that is further from the oldest segment's time range + retention period (from your code it seems you do not override it from TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one day is used.
So with WallclockTimeExtractor since it is using system time, it wont give you timestamps that span for more than a day during a short period of time, but if your own defined timestamps expand that value, then old segments will be dropped immediately and hence the aggregate values will be returned as a single value. Guozhang On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matth...@confluent.io> wrote: > The extractor is used in > > org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords() > > Let us know, if you could resolve the problem or need more help. > > -Matthias > > On 12/2/16 11:46 AM, Sachin Mittal wrote: > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am > > using. The version is 0.5x. Can you please tell me what code in streams > > calls the timestamp extractor. I can look there to see if there is any > > issue. > > > > Again issue happens only when producing the messages using producer that > is > > compatible with kafka version 0.8x. I see that this producer does not > send > > a record timestamp as this was introduced in version 0.10 only. > > > > Thanks > > Sachin > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io> > wrote: > > > >> I am not sure what is happening. That's why it would be good to have a > >> toy example to reproduce the issue. > >> > >> What do you mean by "Kafka node version 0.5"? > >> > >> -Matthias > >> > >> On 12/2/16 11:30 AM, Sachin Mittal wrote: > >>> I can provide with the data but data does not seem to be the issue. > >>> If I submit the same data and use same timestamp extractor using the > >> java > >>> client with kafka version 0.10.0.1 aggregation works fine. > >>> I find the issue only when submitting the data with kafka node version > >> 0.5. > >>> It looks like the stream does not extract the time correctly in that > >> case. > >>> > >>> Thanks > >>> Sachin > >>> > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> > >> wrote: > >>> > >>>> Can you provide example input data (including timetamps) and result. > >>>> What is the expected result (ie, what aggregation do you apply)? > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote: > >>>>> Hi, > >>>>> After much debugging I found an issue with timestamp extractor. > >>>>> > >>>>> If I use a custom timestamp extractor with following code: > >>>>> public static class MessageTimestampExtractor implements > >>>>> TimestampExtractor { > >>>>> public long extract(ConsumerRecord<Object, Object> record) { > >>>>> if (record.value() instanceof Message) { > >>>>> return ((Message) record.value()).ts; > >>>>> } else { > >>>>> return record.timestamp(); > >>>>> } > >>>>> } > >>>>> } > >>>>> > >>>>> Here message has a long field ts which stores the timestamp, the > >>>>> aggregation does not work. > >>>>> Note I have checked and ts has valid timestamp values. > >>>>> > >>>>> However if I replace it with say WallclockTimestampExtractor > >> aggregation > >>>> is > >>>>> working fine. > >>>>> > >>>>> I do not understand what could be the issue here. > >>>>> > >>>>> Also note I am using kafka streams version 0.10.0.1 and I am > publishing > >>>>> messages via > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old > >> 0.5.x > >>>>> > >>>>> Let me know if there is some bug in time stamp extractions. > >>>>> > >>>>> Thanks > >>>>> Sachin > >>>>> > >>>>> > >>>>> > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Sachin, > >>>>>> > >>>>>> This is indeed a bit wired, and we'd like to try to re-produce your > >>>> issue > >>>>>> locally. Do you have a sample input data for us to try out? > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com > > > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> I fixed that sorted set issue but I am facing a weird problem > which I > >>>> am > >>>>>>> not able to replicate. > >>>>>>> > >>>>>>> Here is the sample problem that I could isolate: > >>>>>>> My class is like this: > >>>>>>> public static class Message implements Comparable<Message> { > >>>>>>> public long ts; > >>>>>>> public String message; > >>>>>>> public String key; > >>>>>>> public Message() {}; > >>>>>>> public Message(long ts, String message, String key) { > >>>>>>> this.ts = ts; > >>>>>>> this.key = key; > >>>>>>> this.message = message; > >>>>>>> } > >>>>>>> public int compareTo(Message paramT) { > >>>>>>> long ts1 = paramT.ts; > >>>>>>> return ts > ts1 ? 1 : -1; > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> pipeline is like this: > >>>>>>> builder.stream(Serdes.String(), messageSerde, > "test-window-stream")\ > >>>>>>> .map(new KeyValueMapper<String, Message, KeyValue<String, > >> Message>>() > >>>> { > >>>>>>> public KeyValue<String, Message> apply(String key, Message > >> value) > >>>> { > >>>>>>> return new KeyValue<String, Message>(value.key, value); > >>>>>>> } > >>>>>>> }) > >>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream") > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { > >>>>>>> public SortedSet<Message> apply() { > >>>>>>> return new TreeSet<Message>(); > >>>>>>> } > >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { > >>>>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>>>> SortedSet<Message> aggregate) { > >>>>>>> aggregate.add(value); > >>>>>>> return aggregate; > >>>>>>> } > >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > >>>>>>> Serdes.String(), messagesSerde) > >>>>>>> .foreach(new ForeachAction<Windowed<String>, > SortedSet<Message>>() { > >>>>>>> public void apply(Windowed<String> key, SortedSet<Message> > >>>> messages) > >>>>>> { > >>>>>>> ... > >>>>>>> } > >>>>>>> }); > >>>>>>> > >>>>>>> So basically I rekey the original message into another topic and > then > >>>>>>> aggregate it based on that key. > >>>>>>> What I have observed is that when I used windowed aggregation the > >>>>>>> aggregator does not use previous aggregated value. > >>>>>>> > >>>>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>>>> SortedSet<Message> aggregate) { > >>>>>>> aggregate.add(value); > >>>>>>> return aggregate; > >>>>>>> } > >>>>>>> > >>>>>>> So in the above function the aggregate is an empty set of every > value > >>>>>>> entering into pipeline. When I remove the windowed aggregation, the > >>>>>>> aggregate set retains previously aggregated values in the set. > >>>>>>> > >>>>>>> I am just not able to wrap my head around it. When I ran this type > of > >>>>>> test > >>>>>>> locally on windows it is working fine. However a similar pipeline > >> setup > >>>>>>> when run against production on linux is behaving strangely and > always > >>>>>>> getting an empty aggregate set. > >>>>>>> Any idea what could be the reason, where should I look at the > >> problem. > >>>>>> Does > >>>>>>> length of key string matters here? I will later try to run the same > >>>>>> simple > >>>>>>> setup on linux and see what happens. But this is a very strange > >>>> behavior. > >>>>>>> > >>>>>>> Thanks > >>>>>>> Sachin > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang < > wangg...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hello Sachin, > >>>>>>>> > >>>>>>>> In the implementation of SortedSet, if the object's implemented > the > >>>>>>>> Comparable interface, that compareTo function is applied in " > >>>>>>>> aggregate.add(value);", and hence if it returns 0, this element > will > >>>>>> not > >>>>>>> be > >>>>>>>> added since it is a Set. > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal < > sjmit...@gmail.com > >>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> What I find is that when I use sorted set as aggregation it fails > >> to > >>>>>>>>> aggregate the values which have compareTo returning 0. > >>>>>>>>> > >>>>>>>>> My class is like this: > >>>>>>>>> public class Message implements Comparable<Message> { > >>>>>>>>> public long ts; > >>>>>>>>> public String message; > >>>>>>>>> public Message() {}; > >>>>>>>>> public Message(long ts, String message) { > >>>>>>>>> this.ts = ts; > >>>>>>>>> this.message = message; > >>>>>>>>> } > >>>>>>>>> public int compareTo(Message paramT) { > >>>>>>>>> long ts1 = paramT.ts; > >>>>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; > >>>>>>>>> } > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> pipeline is like this: > >>>>>>>>> builder.stream(Serdes.String(), messageSerde, > >> "test-window-stream") > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { > >>>>>>>>> public SortedSet<Message> apply() { > >>>>>>>>> return new TreeSet<Message>(); > >>>>>>>>> } > >>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { > >>>>>>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>>>>>> SortedSet<Message> aggregate) { > >>>>>>>>> aggregate.add(value); > >>>>>>>>> return aggregate; > >>>>>>>>> } > >>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * > 1000L), > >>>>>>>>> Serdes.String(), messagesSerde) > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>, > >> SortedSet<Message>>() { > >>>>>>>>> public void apply(Windowed<String> key, SortedSet<Message> > >>>>>>> messages) > >>>>>>>> { > >>>>>>>>> ... > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> > >>>>>>>>> So any message published between 10 and 20 seconds gets > aggregated > >> in > >>>>>>> 10 > >>>>>>>> - > >>>>>>>>> 20 bucket and I print the size of the set. > >>>>>>>>> However output I get is following: > >>>>>>>>> > >>>>>>>>> Published: 14 > >>>>>>>>> Aggregated: 10 20 -> 1 > >>>>>>>>> > >>>>>>>>> Published: 18 > >>>>>>>>> Aggregated: 10 20 -> 2 > >>>>>>>>> > >>>>>>>>> Published: 11 > >>>>>>>>> Aggregated: 10 20 -> 3 > >>>>>>>>> > >>>>>>>>> Published: 17 > >>>>>>>>> Aggregated: 10 20 -> 4 > >>>>>>>>> > >>>>>>>>> Published: 14 > >>>>>>>>> Aggregated: 10 20 -> 4 > >>>>>>>>> > >>>>>>>>> Published: 15 > >>>>>>>>> Aggregated: 10 20 -> 5 > >>>>>>>>> > >>>>>>>>> Published: 12 > >>>>>>>>> Aggregated: key2 10 20 -> 6 > >>>>>>>>> > >>>>>>>>> Published: 12 > >>>>>>>>> Aggregated: 10 20 -> 6 > >>>>>>>>> > >>>>>>>>> So if you see any message that occurs again for same second, > where > >>>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline. > >>>>>>>>> Notice ones published at 14 and 12 seconds. > >>>>>>>>> > >>>>>>>>> Now I am not sure if problem is with Java ie I should use > >> Comparator > >>>>>>>>> interface and not Comparable for my Message object. Or the > problem > >> is > >>>>>>>> with > >>>>>>>>> Kafka stream or with serializing and de-serializing the set of > >>>>>>> messages. > >>>>>>>> If > >>>>>>>>> I replace Set with List all is working fine. > >>>>>>>>> > >>>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see > >> what > >>>>>>> is > >>>>>>>>> the best java practice here. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> Sachin > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll < > >> mich...@confluent.io> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal < > >> sjmit...@gmail.com > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1. > >>>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. > >>>>>>>>>>> If the stream app using timestamp extractor puts the message in > >>>>>> one > >>>>>>>> or > >>>>>>>>>> more > >>>>>>>>>>> bucket(s), it will get aggregated in those buckets. > >>>>>>>>>>> I assume this statement is correct. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Yes. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Also say when I restart the streams application then bucket > >>>>>>>> aggregation > >>>>>>>>>>> will resume from last point of halt. > >>>>>>>>>>> I hope this is also correct. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Yes. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> What I noticed that once a message is placed in one bucket, > that > >>>>>>>> bucket > >>>>>>>>>> was > >>>>>>>>>>> not getting new messages. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> This should not happen... > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> However when I ran a small test case replicating that, it is > >>>>>>> working > >>>>>>>>>>> properly. There maybe some issues in application reset. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> ...and apparently it works (as expected) in your small test > case. > >>>>>>>>>> > >>>>>>>>>> Do you have any further information that you could share with us > >> so > >>>>>>> we > >>>>>>>>> can > >>>>>>>>>> help you better? What's the difference, for example, between > your > >>>>>>>>> "normal" > >>>>>>>>>> use case and the small test case you have been referring to? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Michael > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > > -- -- Guozhang