Hi Sachin, Note that "until" means that the window will be retained for that period of time after the window starting time. So when you set the time to 1 year, if there is a message whose timestamp is 1 year + 1 sec beyond the "current stream time", then yes it will cause the window to be dropped. But in practice, if you are confident that you would not likely receive a message stamped 2017.Dec.12 (from your use case it seems possible that different source's clocks can be shifted by a bit, but not as much as a year right? ), then it is still helps with the problem.
Guozhang On Fri, Dec 9, 2016 at 8:57 PM, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I think windows retention period does not solves the problem, only delays > it. > Based on what I understand say I set the time to 1 year using until. > Then when I get the message with timestamp 1 year + 1 sec it will delete > the old windows and create new ones from that message. > Now let us say we get next message with timestamp 1 year - 1 sec, based on > what you said, it will ignore this message. > > In my case we get messages from different sources whose clocks are not in > sync. So overall message come with increasing timestamp but for a short > duration there is no order guarantee. > > So I think before deleting the older windows it should retain small portion > of old windows too, so nearby older messages are not dropped. > > I suggest have something like windows.size.advanceBy.until.retain > Retain will retain the periods which fall under retain ms from the upper > bound. > > So window can be defined as > TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * > 1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l) > So when dropping older windows it will retain the ones fall in last 15 > minutes. > > > Please let me know in case I missed something on how and if at all older > messages are dropped. > > Thanks > Sachin > > > > > > On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Assuming your windows retention period is the same to the window length, > > then it is true that ZZ will cause the current window to be dropper. And > > then when ZZA is recieved, it will not cause the old windows to be > > re-created but will be ignored since it is considered as "expired". > > > > Note that you can set the window retention period much longer than the > > window length itself, using the "until" API I mentioned above to handle > any > > sudden future records. > > > > > > > > Guozhang > > > > On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > Hi, > > > Right now in order to circumvent this problem I am using a timestamp > > whose > > > values increase by few ms as and when I get new records. > > > So lets say I have records in order > > > A -> lower limit TS + 1 sec > > > B -> lower limit TS + 3 sec > > > C -> lower limit TS + 5 sec > > > .. > > > Z -> upper limit TS - 1 sec > > > > > > Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it > will > > > drop the previous windows and create new ones based on this timestamp. > > > Please confirm this understanding. > > > > > > Now lets say I get new record ZZA with timestamp (old) upper limit TS > - 1 > > > sec, will this again cause new windows to be dropped and recreate older > > > windows fresh with all the older aggregation done so far lost? > > > > > > Thanks > > > Sachin > > > > > > > > > > > > > > > On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Sachin, > > > > > > > > I am with you that ideally the windowing segmentation implementation > > > should > > > > be totally abstracted from users but today it is a bit confusing to > > > > understand. I have filed JIRA some time ago to improve on this end: > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-3596 > > > > > > > > So to your example, if a "far future record" was received whose > > timestamp > > > > is beyond current time + the retention period, it could potentially > > cause > > > > the current window to be dropped. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > > > > > Hi, > > > > > I think now it makes all the sense. The field I was using for > > timestamp > > > > > extractor contains timestamps which spans for greater than a day's > > > > duration > > > > > and it worked for wall clock because for short duration timestamps > > were > > > > in > > > > > day's range. > > > > > > > > > > I wanted to understand one thing: > > > > > Say I have a timestamp extractor field and as record gets ingested > > > future > > > > > records will have increasing values for the timestamp. > > > > > > > > > > Now lets say default duration is one day. At a future time a record > > > will > > > > > have timestamp which now is greater than the initial day's range. > > > > > What will happen then, it will create a new segment and then create > > > > windows > > > > > in it for the next day's duration? > > > > > What happens if now it gets a record from the previous day, will it > > get > > > > > discarded or will it again have just the single value aggregated in > > it > > > > > (previous values are lost). > > > > > So when new segment is create as I understand does it retain the > > older > > > > > segments data. > > > > > > > > > > This is bit confusing, so would be helpful if you can explain in > bit > > > more > > > > > detail. > > > > > > > > > > Thanks > > > > > Sachin > > > > > > > > > > > > > > > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Sachin, > > > > > > > > > > > > One thing to note is that the retention of the windowed stores > > works > > > by > > > > > > keeping multiple segments of the stores where each segments > stores > > a > > > > time > > > > > > range which can potentially span multiple windows, if a new > window > > > > needs > > > > > to > > > > > > be created that is further from the oldest segment's time range + > > > > > retention > > > > > > period (from your code it seems you do not override it from > > > > > > TimeWindows.of("stream-table", > > > > > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default > > of > > > > one > > > > > > day is used. > > > > > > > > > > > > So with WallclockTimeExtractor since it is using system time, it > > wont > > > > > give > > > > > > you timestamps that span for more than a day during a short > period > > of > > > > > time, > > > > > > but if your own defined timestamps expand that value, then old > > > segments > > > > > > will be dropped immediately and hence the aggregate values will > be > > > > > returned > > > > > > as a single value. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax < > > > > matth...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > The extractor is used in > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > RecordQueue#addRawRecords() > > > > > > > > > > > > > > Let us know, if you could resolve the problem or need more > help. > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > On 12/2/16 11:46 AM, Sachin Mittal wrote: > > > > > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js > > > client > > > > i > > > > > am > > > > > > > > using. The version is 0.5x. Can you please tell me what code > in > > > > > streams > > > > > > > > calls the timestamp extractor. I can look there to see if > there > > > is > > > > > any > > > > > > > > issue. > > > > > > > > > > > > > > > > Again issue happens only when producing the messages using > > > producer > > > > > > that > > > > > > > is > > > > > > > > compatible with kafka version 0.8x. I see that this producer > > does > > > > not > > > > > > > send > > > > > > > > a record timestamp as this was introduced in version 0.10 > only. > > > > > > > > > > > > > > > > Thanks > > > > > > > > Sachin > > > > > > > > > > > > > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" < > > > matth...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > >> I am not sure what is happening. That's why it would be good > > to > > > > > have a > > > > > > > >> toy example to reproduce the issue. > > > > > > > >> > > > > > > > >> What do you mean by "Kafka node version 0.5"? > > > > > > > >> > > > > > > > >> -Matthias > > > > > > > >> > > > > > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote: > > > > > > > >>> I can provide with the data but data does not seem to be > the > > > > issue. > > > > > > > >>> If I submit the same data and use same timestamp extractor > > > using > > > > > the > > > > > > > >> java > > > > > > > >>> client with kafka version 0.10.0.1 aggregation works fine. > > > > > > > >>> I find the issue only when submitting the data with kafka > > node > > > > > > version > > > > > > > >> 0.5. > > > > > > > >>> It looks like the stream does not extract the time > correctly > > in > > > > > that > > > > > > > >> case. > > > > > > > >>> > > > > > > > >>> Thanks > > > > > > > >>> Sachin > > > > > > > >>> > > > > > > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" < > > > > matth...@confluent.io > > > > > > > > > > > > > >> wrote: > > > > > > > >>> > > > > > > > >>>> Can you provide example input data (including timetamps) > and > > > > > result. > > > > > > > >>>> What is the expected result (ie, what aggregation do you > > > apply)? > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> -Matthias > > > > > > > >>>> > > > > > > > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote: > > > > > > > >>>>> Hi, > > > > > > > >>>>> After much debugging I found an issue with timestamp > > > extractor. > > > > > > > >>>>> > > > > > > > >>>>> If I use a custom timestamp extractor with following > code: > > > > > > > >>>>> public static class MessageTimestampExtractor > > implements > > > > > > > >>>>> TimestampExtractor { > > > > > > > >>>>> public long extract(ConsumerRecord<Object, > Object> > > > > > record) > > > > > > { > > > > > > > >>>>> if (record.value() instanceof Message) { > > > > > > > >>>>> return ((Message) record.value()).ts; > > > > > > > >>>>> } else { > > > > > > > >>>>> return record.timestamp(); > > > > > > > >>>>> } > > > > > > > >>>>> } > > > > > > > >>>>> } > > > > > > > >>>>> > > > > > > > >>>>> Here message has a long field ts which stores the > > timestamp, > > > > the > > > > > > > >>>>> aggregation does not work. > > > > > > > >>>>> Note I have checked and ts has valid timestamp values. > > > > > > > >>>>> > > > > > > > >>>>> However if I replace it with say > > WallclockTimestampExtractor > > > > > > > >> aggregation > > > > > > > >>>> is > > > > > > > >>>>> working fine. > > > > > > > >>>>> > > > > > > > >>>>> I do not understand what could be the issue here. > > > > > > > >>>>> > > > > > > > >>>>> Also note I am using kafka streams version 0.10.0.1 and I > > am > > > > > > > publishing > > > > > > > >>>>> messages via > > > > > > > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is > > > quite > > > > > old > > > > > > > >> 0.5.x > > > > > > > >>>>> > > > > > > > >>>>> Let me know if there is some bug in time stamp > extractions. > > > > > > > >>>>> > > > > > > > >>>>> Thanks > > > > > > > >>>>> Sachin > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang < > > > > > > wangg...@gmail.com> > > > > > > > >>>> wrote: > > > > > > > >>>>> > > > > > > > >>>>>> Sachin, > > > > > > > >>>>>> > > > > > > > >>>>>> This is indeed a bit wired, and we'd like to try to > > > re-produce > > > > > > your > > > > > > > >>>> issue > > > > > > > >>>>>> locally. Do you have a sample input data for us to try > > out? > > > > > > > >>>>>> > > > > > > > >>>>>> Guozhang > > > > > > > >>>>>> > > > > > > > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal < > > > > > > sjmit...@gmail.com > > > > > > > > > > > > > > > >>>>>> wrote: > > > > > > > >>>>>> > > > > > > > >>>>>>> Hi, > > > > > > > >>>>>>> I fixed that sorted set issue but I am facing a weird > > > problem > > > > > > > which I > > > > > > > >>>> am > > > > > > > >>>>>>> not able to replicate. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Here is the sample problem that I could isolate: > > > > > > > >>>>>>> My class is like this: > > > > > > > >>>>>>> public static class Message implements > > > > Comparable<Message> > > > > > { > > > > > > > >>>>>>> public long ts; > > > > > > > >>>>>>> public String message; > > > > > > > >>>>>>> public String key; > > > > > > > >>>>>>> public Message() {}; > > > > > > > >>>>>>> public Message(long ts, String message, String > > > key) { > > > > > > > >>>>>>> this.ts = ts; > > > > > > > >>>>>>> this.key = key; > > > > > > > >>>>>>> this.message = message; > > > > > > > >>>>>>> } > > > > > > > >>>>>>> public int compareTo(Message paramT) { > > > > > > > >>>>>>> long ts1 = paramT.ts; > > > > > > > >>>>>>> return ts > ts1 ? 1 : -1; > > > > > > > >>>>>>> } > > > > > > > >>>>>>> } > > > > > > > >>>>>>> > > > > > > > >>>>>>> pipeline is like this: > > > > > > > >>>>>>> builder.stream(Serdes.String(), messageSerde, > > > > > > > "test-window-stream")\ > > > > > > > >>>>>>> .map(new KeyValueMapper<String, Message, > > KeyValue<String, > > > > > > > >> Message>>() > > > > > > > >>>> { > > > > > > > >>>>>>> public KeyValue<String, Message> apply(String key, > > > > Message > > > > > > > >> value) > > > > > > > >>>> { > > > > > > > >>>>>>> return new KeyValue<String, > Message>(value.key, > > > > > value); > > > > > > > >>>>>>> } > > > > > > > >>>>>>> }) > > > > > > > >>>>>>> .through(Serdes.String(), messageSerde, > > > > > "test-window-key-stream") > > > > > > > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() > { > > > > > > > >>>>>>> public SortedSet<Message> apply() { > > > > > > > >>>>>>> return new TreeSet<Message>(); > > > > > > > >>>>>>> } > > > > > > > >>>>>>> }, new Aggregator<String, Message, > SortedSet<Message>>() > > { > > > > > > > >>>>>>> public SortedSet<Message> apply(String aggKey, > > Message > > > > > value, > > > > > > > >>>>>>> SortedSet<Message> aggregate) { > > > > > > > >>>>>>> aggregate.add(value); > > > > > > > >>>>>>> return aggregate; > > > > > > > >>>>>>> } > > > > > > > >>>>>>> }, TimeWindows.of("stream-table", 10 * > > 1000L).advanceBy(5 * > > > > > > 1000L), > > > > > > > >>>>>>> Serdes.String(), messagesSerde) > > > > > > > >>>>>>> .foreach(new ForeachAction<Windowed<String>, > > > > > > > SortedSet<Message>>() { > > > > > > > >>>>>>> public void apply(Windowed<String> key, > > > > SortedSet<Message> > > > > > > > >>>> messages) > > > > > > > >>>>>> { > > > > > > > >>>>>>> ... > > > > > > > >>>>>>> } > > > > > > > >>>>>>> }); > > > > > > > >>>>>>> > > > > > > > >>>>>>> So basically I rekey the original message into another > > > topic > > > > > and > > > > > > > then > > > > > > > >>>>>>> aggregate it based on that key. > > > > > > > >>>>>>> What I have observed is that when I used windowed > > > aggregation > > > > > the > > > > > > > >>>>>>> aggregator does not use previous aggregated value. > > > > > > > >>>>>>> > > > > > > > >>>>>>> public SortedSet<Message> apply(String aggKey, Message > > > value, > > > > > > > >>>>>>> SortedSet<Message> aggregate) { > > > > > > > >>>>>>> aggregate.add(value); > > > > > > > >>>>>>> return aggregate; > > > > > > > >>>>>>> } > > > > > > > >>>>>>> > > > > > > > >>>>>>> So in the above function the aggregate is an empty set > of > > > > every > > > > > > > value > > > > > > > >>>>>>> entering into pipeline. When I remove the windowed > > > > aggregation, > > > > > > the > > > > > > > >>>>>>> aggregate set retains previously aggregated values in > the > > > > set. > > > > > > > >>>>>>> > > > > > > > >>>>>>> I am just not able to wrap my head around it. When I > ran > > > this > > > > > > type > > > > > > > of > > > > > > > >>>>>> test > > > > > > > >>>>>>> locally on windows it is working fine. However a > similar > > > > > pipeline > > > > > > > >> setup > > > > > > > >>>>>>> when run against production on linux is behaving > > strangely > > > > and > > > > > > > always > > > > > > > >>>>>>> getting an empty aggregate set. > > > > > > > >>>>>>> Any idea what could be the reason, where should I look > at > > > the > > > > > > > >> problem. > > > > > > > >>>>>> Does > > > > > > > >>>>>>> length of key string matters here? I will later try to > > run > > > > the > > > > > > same > > > > > > > >>>>>> simple > > > > > > > >>>>>>> setup on linux and see what happens. But this is a very > > > > strange > > > > > > > >>>> behavior. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Thanks > > > > > > > >>>>>>> Sachin > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang < > > > > > > > wangg...@gmail.com> > > > > > > > >>>>>>> wrote: > > > > > > > >>>>>>> > > > > > > > >>>>>>>> Hello Sachin, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> In the implementation of SortedSet, if the object's > > > > > implemented > > > > > > > the > > > > > > > >>>>>>>> Comparable interface, that compareTo function is > applied > > > in > > > > " > > > > > > > >>>>>>>> aggregate.add(value);", and hence if it returns 0, > this > > > > > element > > > > > > > will > > > > > > > >>>>>> not > > > > > > > >>>>>>> be > > > > > > > >>>>>>>> added since it is a Set. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Guozhang > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal < > > > > > > > sjmit...@gmail.com > > > > > > > >>> > > > > > > > >>>>>>>> wrote: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> Hi, > > > > > > > >>>>>>>>> What I find is that when I use sorted set as > > aggregation > > > it > > > > > > fails > > > > > > > >> to > > > > > > > >>>>>>>>> aggregate the values which have compareTo returning > 0. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> My class is like this: > > > > > > > >>>>>>>>> public class Message implements > > Comparable<Message> { > > > > > > > >>>>>>>>> public long ts; > > > > > > > >>>>>>>>> public String message; > > > > > > > >>>>>>>>> public Message() {}; > > > > > > > >>>>>>>>> public Message(long ts, String message) { > > > > > > > >>>>>>>>> this.ts = ts; > > > > > > > >>>>>>>>> this.message = message; > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> public int compareTo(Message paramT) { > > > > > > > >>>>>>>>> long ts1 = paramT.ts; > > > > > > > >>>>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> pipeline is like this: > > > > > > > >>>>>>>>> builder.stream(Serdes.String(), messageSerde, > > > > > > > >> "test-window-stream") > > > > > > > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message> > >() > > { > > > > > > > >>>>>>>>> public SortedSet<Message> apply() { > > > > > > > >>>>>>>>> return new TreeSet<Message>(); > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> }, new Aggregator<String, Message, > > SortedSet<Message>>() > > > { > > > > > > > >>>>>>>>> public SortedSet<Message> apply(String aggKey, > > > Message > > > > > > value, > > > > > > > >>>>>>>>> SortedSet<Message> aggregate) { > > > > > > > >>>>>>>>> aggregate.add(value); > > > > > > > >>>>>>>>> return aggregate; > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> }, TimeWindows.of("stream-table", 10 * > > > 1000L).advanceBy(5 * > > > > > > > 1000L), > > > > > > > >>>>>>>>> Serdes.String(), messagesSerde) > > > > > > > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>, > > > > > > > >> SortedSet<Message>>() { > > > > > > > >>>>>>>>> public void apply(Windowed<String> key, > > > > > SortedSet<Message> > > > > > > > >>>>>>> messages) > > > > > > > >>>>>>>> { > > > > > > > >>>>>>>>> ... > > > > > > > >>>>>>>>> } > > > > > > > >>>>>>>>> }); > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> So any message published between 10 and 20 seconds > gets > > > > > > > aggregated > > > > > > > >> in > > > > > > > >>>>>>> 10 > > > > > > > >>>>>>>> - > > > > > > > >>>>>>>>> 20 bucket and I print the size of the set. > > > > > > > >>>>>>>>> However output I get is following: > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 14 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 1 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 18 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 2 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 11 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 3 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 17 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 4 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 14 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 4 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 15 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 5 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 12 > > > > > > > >>>>>>>>> Aggregated: key2 10 20 -> 6 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Published: 12 > > > > > > > >>>>>>>>> Aggregated: 10 20 -> 6 > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> So if you see any message that occurs again for same > > > > second, > > > > > > > where > > > > > > > >>>>>>>>> compareTo returns 0, it fails to get aggregated in > the > > > > > > pipeline. > > > > > > > >>>>>>>>> Notice ones published at 14 and 12 seconds. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Now I am not sure if problem is with Java ie I should > > use > > > > > > > >> Comparator > > > > > > > >>>>>>>>> interface and not Comparable for my Message object. > Or > > > the > > > > > > > problem > > > > > > > >> is > > > > > > > >>>>>>>> with > > > > > > > >>>>>>>>> Kafka stream or with serializing and de-serializing > the > > > set > > > > > of > > > > > > > >>>>>>> messages. > > > > > > > >>>>>>>> If > > > > > > > >>>>>>>>> I replace Set with List all is working fine. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Anyway any ideas here would be appreciated, meanwhile > > let > > > > me > > > > > > see > > > > > > > >> what > > > > > > > >>>>>>> is > > > > > > > >>>>>>>>> the best java practice here. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Thanks > > > > > > > >>>>>>>>> Sachin > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll < > > > > > > > >> mich...@confluent.io> > > > > > > > >>>>>>>>> wrote: > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal < > > > > > > > >> sjmit...@gmail.com > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> wrote: > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1. > > > > > > > >>>>>>>>>>> Say I am having a window of 60 minutes advanced by > 15 > > > > > > minutes. > > > > > > > >>>>>>>>>>> If the stream app using timestamp extractor puts > the > > > > > message > > > > > > in > > > > > > > >>>>>> one > > > > > > > >>>>>>>> or > > > > > > > >>>>>>>>>> more > > > > > > > >>>>>>>>>>> bucket(s), it will get aggregated in those buckets. > > > > > > > >>>>>>>>>>> I assume this statement is correct. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Yes. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Also say when I restart the streams application > then > > > > bucket > > > > > > > >>>>>>>> aggregation > > > > > > > >>>>>>>>>>> will resume from last point of halt. > > > > > > > >>>>>>>>>>> I hope this is also correct. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Yes. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> What I noticed that once a message is placed in one > > > > bucket, > > > > > > > that > > > > > > > >>>>>>>> bucket > > > > > > > >>>>>>>>>> was > > > > > > > >>>>>>>>>>> not getting new messages. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> This should not happen... > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> However when I ran a small test case replicating > > that, > > > it > > > > > is > > > > > > > >>>>>>> working > > > > > > > >>>>>>>>>>> properly. There maybe some issues in application > > reset. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> ...and apparently it works (as expected) in your > small > > > > test > > > > > > > case. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Do you have any further information that you could > > share > > > > > with > > > > > > us > > > > > > > >> so > > > > > > > >>>>>>> we > > > > > > > >>>>>>>>> can > > > > > > > >>>>>>>>>> help you better? What's the difference, for > example, > > > > > between > > > > > > > your > > > > > > > >>>>>>>>> "normal" > > > > > > > >>>>>>>>>> use case and the small test case you have been > > referring > > > > to? > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> -Michael > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> -- > > > > > > > >>>>>>>> -- Guozhang > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> -- > > > > > > > >>>>>> -- Guozhang > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang