Hi Sachin,

Note that "until" means that the window will be retained for that period of
time after the window starting time. So when you set the time to 1 year, if
there is a message whose timestamp is 1 year + 1 sec beyond the "current
stream time", then yes it will cause the window to be dropped. But in
practice, if you are confident that you would not likely receive a message
stamped 2017.Dec.12 (from your use case it seems possible that different
source's clocks can be shifted by a bit, but not as much as a year right?
), then it is still helps with the problem.


Guozhang


On Fri, Dec 9, 2016 at 8:57 PM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I think windows retention period does not solves the problem, only delays
> it.
> Based on what I understand say I set the time to 1 year using until.
> Then when I get the message with timestamp 1 year + 1 sec it will delete
> the old windows and create new ones from that message.
> Now let us say we get next message with timestamp 1 year - 1 sec, based on
> what you said, it will ignore this message.
>
> In my case we get messages from different sources whose clocks are not in
> sync. So overall message come with increasing timestamp but for a short
> duration there is no order guarantee.
>
> So I think before deleting the older windows it should retain small portion
> of old windows too, so nearby older messages are not dropped.
>
> I suggest have something like windows.size.advanceBy.until.retain
> Retain will retain the periods which fall under retain ms from the upper
> bound.
>
> So window can be defined as
> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> 1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l)
> So when dropping older windows it will retain the ones fall in last 15
> minutes.
>
>
> Please let me know in case I missed something on how and if at all older
> messages are dropped.
>
> Thanks
> Sachin
>
>
>
>
>
> On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Assuming your windows retention period is the same to the window length,
> > then it is true that ZZ will cause the current window to be dropper. And
> > then when ZZA is recieved, it will not cause the old windows to be
> > re-created but will be ignored since it is considered as "expired".
> >
> > Note that you can set the window retention period much longer than the
> > window length itself, using the "until" API I mentioned above to handle
> any
> > sudden future records.
> >
> >
> >
> > Guozhang
> >
> > On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > Hi,
> > > Right now in order to circumvent this problem I am using a timestamp
> > whose
> > > values increase by few ms as and when I get new records.
> > > So lets say I have records in order
> > > A -> lower limit TS + 1 sec
> > > B -> lower limit TS + 3 sec
> > > C -> lower limit TS + 5 sec
> > > ..
> > > Z -> upper limit TS - 1 sec
> > >
> > > Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it
> will
> > > drop the previous windows and create new ones based on this timestamp.
> > > Please confirm this understanding.
> > >
> > > Now lets say I get new record ZZA with timestamp (old) upper limit TS
> - 1
> > > sec, will this again cause new windows to be dropped and recreate older
> > > windows fresh with all the older aggregation done so far lost?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Sachin,
> > > >
> > > > I am with you that ideally the windowing segmentation implementation
> > > should
> > > > be totally abstracted from users but today it is a bit confusing to
> > > > understand. I have filed JIRA some time ago to improve on this end:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3596
> > > >
> > > > So to your example, if a "far future record" was received whose
> > timestamp
> > > > is beyond current time + the retention period, it could potentially
> > cause
> > > > the current window to be dropped.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > > I think now it makes all the sense. The field I was using for
> > timestamp
> > > > > extractor contains timestamps which spans for greater than a day's
> > > > duration
> > > > > and it worked for wall clock because for short duration timestamps
> > were
> > > > in
> > > > > day's range.
> > > > >
> > > > > I wanted to understand one thing:
> > > > > Say I have a timestamp extractor field and as record gets ingested
> > > future
> > > > > records will have increasing values for the timestamp.
> > > > >
> > > > > Now lets say default duration is one day. At a future time a record
> > > will
> > > > > have timestamp which now is greater than the initial day's range.
> > > > > What will happen then, it will create a new segment and then create
> > > > windows
> > > > > in it for the next day's duration?
> > > > > What happens if now it gets a record from the previous day, will it
> > get
> > > > > discarded or will it again have just the single value aggregated in
> > it
> > > > > (previous values are lost).
> > > > > So when new segment is create as I understand does it retain the
> > older
> > > > > segments data.
> > > > >
> > > > > This is bit confusing, so would be helpful if you can explain in
> bit
> > > more
> > > > > detail.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Sachin,
> > > > > >
> > > > > > One thing to note is that the retention of the windowed stores
> > works
> > > by
> > > > > > keeping multiple segments of the stores where each segments
> stores
> > a
> > > > time
> > > > > > range which can potentially span multiple windows, if a new
> window
> > > > needs
> > > > > to
> > > > > > be created that is further from the oldest segment's time range +
> > > > > retention
> > > > > > period (from your code it seems you do not override it from
> > > > > > TimeWindows.of("stream-table",
> > > > > > 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default
> > of
> > > > one
> > > > > > day is used.
> > > > > >
> > > > > > So with WallclockTimeExtractor since it is using system time, it
> > wont
> > > > > give
> > > > > > you timestamps that span for more than a day during a short
> period
> > of
> > > > > time,
> > > > > > but if your own defined timestamps expand that value, then old
> > > segments
> > > > > > will be dropped immediately and hence the aggregate values will
> be
> > > > > returned
> > > > > > as a single value.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
> > > > matth...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > The extractor is used in
> > > > > > >
> > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > RecordQueue#addRawRecords()
> > > > > > >
> > > > > > > Let us know, if you could resolve the problem or need more
> help.
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > > > > > > > https://github.com/SOHU-Co/kafka-node/ this is the node js
> > > client
> > > > i
> > > > > am
> > > > > > > > using. The version is 0.5x. Can you please tell me what code
> in
> > > > > streams
> > > > > > > > calls the timestamp extractor. I can look there to see if
> there
> > > is
> > > > > any
> > > > > > > > issue.
> > > > > > > >
> > > > > > > > Again issue happens only when producing the messages using
> > > producer
> > > > > > that
> > > > > > > is
> > > > > > > > compatible with kafka version 0.8x. I see that this producer
> > does
> > > > not
> > > > > > > send
> > > > > > > > a record timestamp as this was introduced in version 0.10
> only.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Sachin
> > > > > > > >
> > > > > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <
> > > matth...@confluent.io>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> I am not sure what is happening. That's why it would be good
> > to
> > > > > have a
> > > > > > > >> toy example to reproduce the issue.
> > > > > > > >>
> > > > > > > >> What do you mean by "Kafka node version 0.5"?
> > > > > > > >>
> > > > > > > >> -Matthias
> > > > > > > >>
> > > > > > > >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > > > > > > >>> I can provide with the data but data does not seem to be
> the
> > > > issue.
> > > > > > > >>> If I submit the same data and use same timestamp extractor
> > > using
> > > > > the
> > > > > > > >> java
> > > > > > > >>> client with kafka version 0.10.0.1 aggregation works fine.
> > > > > > > >>> I find the issue only when submitting the data with kafka
> > node
> > > > > > version
> > > > > > > >> 0.5.
> > > > > > > >>> It looks like the stream does not extract the time
> correctly
> > in
> > > > > that
> > > > > > > >> case.
> > > > > > > >>>
> > > > > > > >>> Thanks
> > > > > > > >>> Sachin
> > > > > > > >>>
> > > > > > > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <
> > > > matth...@confluent.io
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >>>
> > > > > > > >>>> Can you provide example input data (including timetamps)
> and
> > > > > result.
> > > > > > > >>>> What is the expected result (ie, what aggregation do you
> > > apply)?
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>> -Matthias
> > > > > > > >>>>
> > > > > > > >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > > > > > > >>>>> Hi,
> > > > > > > >>>>> After much debugging I found an issue with timestamp
> > > extractor.
> > > > > > > >>>>>
> > > > > > > >>>>> If I use a custom timestamp extractor with following
> code:
> > > > > > > >>>>>     public static class MessageTimestampExtractor
> > implements
> > > > > > > >>>>> TimestampExtractor {
> > > > > > > >>>>>         public long extract(ConsumerRecord<Object,
> Object>
> > > > > record)
> > > > > > {
> > > > > > > >>>>>             if (record.value() instanceof Message) {
> > > > > > > >>>>>                 return ((Message) record.value()).ts;
> > > > > > > >>>>>             } else {
> > > > > > > >>>>>                 return record.timestamp();
> > > > > > > >>>>>             }
> > > > > > > >>>>>         }
> > > > > > > >>>>>     }
> > > > > > > >>>>>
> > > > > > > >>>>> Here message has a long field ts which stores the
> > timestamp,
> > > > the
> > > > > > > >>>>> aggregation does not work.
> > > > > > > >>>>> Note I have checked and ts has valid timestamp values.
> > > > > > > >>>>>
> > > > > > > >>>>> However if I replace it with say
> > WallclockTimestampExtractor
> > > > > > > >> aggregation
> > > > > > > >>>> is
> > > > > > > >>>>> working fine.
> > > > > > > >>>>>
> > > > > > > >>>>> I do not understand what could be the issue here.
> > > > > > > >>>>>
> > > > > > > >>>>> Also note I am using kafka streams version 0.10.0.1 and I
> > am
> > > > > > > publishing
> > > > > > > >>>>> messages via
> > > > > > > >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is
> > > quite
> > > > > old
> > > > > > > >> 0.5.x
> > > > > > > >>>>>
> > > > > > > >>>>> Let me know if there is some bug in time stamp
> extractions.
> > > > > > > >>>>>
> > > > > > > >>>>> Thanks
> > > > > > > >>>>> Sachin
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <
> > > > > > wangg...@gmail.com>
> > > > > > > >>>> wrote:
> > > > > > > >>>>>
> > > > > > > >>>>>> Sachin,
> > > > > > > >>>>>>
> > > > > > > >>>>>> This is indeed a bit wired, and we'd like to try to
> > > re-produce
> > > > > > your
> > > > > > > >>>> issue
> > > > > > > >>>>>> locally. Do you have a sample input data for us to try
> > out?
> > > > > > > >>>>>>
> > > > > > > >>>>>> Guozhang
> > > > > > > >>>>>>
> > > > > > > >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <
> > > > > > sjmit...@gmail.com
> > > > > > > >
> > > > > > > >>>>>> wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>>> Hi,
> > > > > > > >>>>>>> I fixed that sorted set issue but I am facing a weird
> > > problem
> > > > > > > which I
> > > > > > > >>>> am
> > > > > > > >>>>>>> not able to replicate.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Here is the sample problem that I could isolate:
> > > > > > > >>>>>>> My class is like this:
> > > > > > > >>>>>>>     public static class Message implements
> > > > Comparable<Message>
> > > > > {
> > > > > > > >>>>>>>         public long ts;
> > > > > > > >>>>>>>         public String message;
> > > > > > > >>>>>>>         public String key;
> > > > > > > >>>>>>>         public Message() {};
> > > > > > > >>>>>>>         public Message(long ts, String message, String
> > > key) {
> > > > > > > >>>>>>>             this.ts = ts;
> > > > > > > >>>>>>>             this.key = key;
> > > > > > > >>>>>>>             this.message = message;
> > > > > > > >>>>>>>         }
> > > > > > > >>>>>>>         public int compareTo(Message paramT) {
> > > > > > > >>>>>>>             long ts1 = paramT.ts;
> > > > > > > >>>>>>>             return ts > ts1 ? 1 : -1;
> > > > > > > >>>>>>>         }
> > > > > > > >>>>>>>     }
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> pipeline is like this:
> > > > > > > >>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > > > > > "test-window-stream")\
> > > > > > > >>>>>>>  .map(new KeyValueMapper<String, Message,
> > KeyValue<String,
> > > > > > > >> Message>>()
> > > > > > > >>>> {
> > > > > > > >>>>>>>      public KeyValue<String, Message> apply(String key,
> > > > Message
> > > > > > > >> value)
> > > > > > > >>>> {
> > > > > > > >>>>>>>          return new KeyValue<String,
> Message>(value.key,
> > > > > value);
> > > > > > > >>>>>>>       }
> > > > > > > >>>>>>>  })
> > > > > > > >>>>>>> .through(Serdes.String(), messageSerde,
> > > > > "test-window-key-stream")
> > > > > > > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>()
> {
> > > > > > > >>>>>>>     public SortedSet<Message> apply() {
> > > > > > > >>>>>>>         return new TreeSet<Message>();
> > > > > > > >>>>>>>     }
> > > > > > > >>>>>>> }, new Aggregator<String, Message,
> SortedSet<Message>>()
> > {
> > > > > > > >>>>>>>     public SortedSet<Message> apply(String aggKey,
> > Message
> > > > > value,
> > > > > > > >>>>>>> SortedSet<Message> aggregate) {
> > > > > > > >>>>>>>         aggregate.add(value);
> > > > > > > >>>>>>>         return aggregate;
> > > > > > > >>>>>>>     }
> > > > > > > >>>>>>> }, TimeWindows.of("stream-table", 10 *
> > 1000L).advanceBy(5 *
> > > > > > 1000L),
> > > > > > > >>>>>>> Serdes.String(), messagesSerde)
> > > > > > > >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > > > > SortedSet<Message>>() {
> > > > > > > >>>>>>>     public void apply(Windowed<String> key,
> > > > SortedSet<Message>
> > > > > > > >>>> messages)
> > > > > > > >>>>>> {
> > > > > > > >>>>>>>         ...
> > > > > > > >>>>>>>     }
> > > > > > > >>>>>>> });
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> So basically I rekey the original message into another
> > > topic
> > > > > and
> > > > > > > then
> > > > > > > >>>>>>> aggregate it based on that key.
> > > > > > > >>>>>>> What I have observed is that when I used windowed
> > > aggregation
> > > > > the
> > > > > > > >>>>>>> aggregator does not use previous aggregated value.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> public SortedSet<Message> apply(String aggKey, Message
> > > value,
> > > > > > > >>>>>>> SortedSet<Message> aggregate) {
> > > > > > > >>>>>>>     aggregate.add(value);
> > > > > > > >>>>>>>     return aggregate;
> > > > > > > >>>>>>> }
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> So in the above function the aggregate is an empty set
> of
> > > > every
> > > > > > > value
> > > > > > > >>>>>>> entering into pipeline. When I remove the windowed
> > > > aggregation,
> > > > > > the
> > > > > > > >>>>>>> aggregate set retains previously aggregated values in
> the
> > > > set.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> I am just not able to wrap my head around it. When I
> ran
> > > this
> > > > > > type
> > > > > > > of
> > > > > > > >>>>>> test
> > > > > > > >>>>>>> locally on windows it is working fine. However a
> similar
> > > > > pipeline
> > > > > > > >> setup
> > > > > > > >>>>>>> when run against production on linux is behaving
> > strangely
> > > > and
> > > > > > > always
> > > > > > > >>>>>>> getting an empty aggregate set.
> > > > > > > >>>>>>> Any idea what could be the reason, where should I look
> at
> > > the
> > > > > > > >> problem.
> > > > > > > >>>>>> Does
> > > > > > > >>>>>>> length of key string matters here? I will later try to
> > run
> > > > the
> > > > > > same
> > > > > > > >>>>>> simple
> > > > > > > >>>>>>> setup on linux and see what happens. But this is a very
> > > > strange
> > > > > > > >>>> behavior.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Thanks
> > > > > > > >>>>>>> Sachin
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <
> > > > > > > wangg...@gmail.com>
> > > > > > > >>>>>>> wrote:
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>> Hello Sachin,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> In the implementation of SortedSet, if the object's
> > > > > implemented
> > > > > > > the
> > > > > > > >>>>>>>> Comparable interface, that compareTo function is
> applied
> > > in
> > > > "
> > > > > > > >>>>>>>> aggregate.add(value);", and hence if it returns 0,
> this
> > > > > element
> > > > > > > will
> > > > > > > >>>>>> not
> > > > > > > >>>>>>> be
> > > > > > > >>>>>>>> added since it is a Set.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Guozhang
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <
> > > > > > > sjmit...@gmail.com
> > > > > > > >>>
> > > > > > > >>>>>>>> wrote:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>> Hi,
> > > > > > > >>>>>>>>> What I find is that when I use sorted set as
> > aggregation
> > > it
> > > > > > fails
> > > > > > > >> to
> > > > > > > >>>>>>>>> aggregate the values which have compareTo returning
> 0.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> My class is like this:
> > > > > > > >>>>>>>>>     public class Message implements
> > Comparable<Message> {
> > > > > > > >>>>>>>>>         public long ts;
> > > > > > > >>>>>>>>>         public String message;
> > > > > > > >>>>>>>>>         public Message() {};
> > > > > > > >>>>>>>>>         public Message(long ts, String message) {
> > > > > > > >>>>>>>>>             this.ts = ts;
> > > > > > > >>>>>>>>>             this.message = message;
> > > > > > > >>>>>>>>>         }
> > > > > > > >>>>>>>>>         public int compareTo(Message paramT) {
> > > > > > > >>>>>>>>>             long ts1 = paramT.ts;
> > > > > > > >>>>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> > > > > > > >>>>>>>>>         }
> > > > > > > >>>>>>>>>     }
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> pipeline is like this:
> > > > > > > >>>>>>>>> builder.stream(Serdes.String(), messageSerde,
> > > > > > > >> "test-window-stream")
> > > > > > > >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>
> >()
> > {
> > > > > > > >>>>>>>>>     public SortedSet<Message> apply() {
> > > > > > > >>>>>>>>>         return new TreeSet<Message>();
> > > > > > > >>>>>>>>>     }
> > > > > > > >>>>>>>>> }, new Aggregator<String, Message,
> > SortedSet<Message>>()
> > > {
> > > > > > > >>>>>>>>>     public SortedSet<Message> apply(String aggKey,
> > > Message
> > > > > > value,
> > > > > > > >>>>>>>>> SortedSet<Message> aggregate) {
> > > > > > > >>>>>>>>>         aggregate.add(value);
> > > > > > > >>>>>>>>>         return aggregate;
> > > > > > > >>>>>>>>>     }
> > > > > > > >>>>>>>>> }, TimeWindows.of("stream-table", 10 *
> > > 1000L).advanceBy(5 *
> > > > > > > 1000L),
> > > > > > > >>>>>>>>> Serdes.String(), messagesSerde)
> > > > > > > >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> > > > > > > >> SortedSet<Message>>() {
> > > > > > > >>>>>>>>>     public void apply(Windowed<String> key,
> > > > > SortedSet<Message>
> > > > > > > >>>>>>> messages)
> > > > > > > >>>>>>>> {
> > > > > > > >>>>>>>>>         ...
> > > > > > > >>>>>>>>>     }
> > > > > > > >>>>>>>>> });
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> So any message published between 10 and 20 seconds
> gets
> > > > > > > aggregated
> > > > > > > >> in
> > > > > > > >>>>>>> 10
> > > > > > > >>>>>>>> -
> > > > > > > >>>>>>>>> 20 bucket and I print the size of the set.
> > > > > > > >>>>>>>>> However output I get is following:
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 14
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 1
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 18
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 2
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 11
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 3
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 17
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 14
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 4
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 15
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 5
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 12
> > > > > > > >>>>>>>>> Aggregated: key2  10  20 -> 6
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Published: 12
> > > > > > > >>>>>>>>> Aggregated: 10  20 -> 6
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> So if you see any message that occurs again for same
> > > > second,
> > > > > > > where
> > > > > > > >>>>>>>>> compareTo returns 0, it fails to get aggregated in
> the
> > > > > > pipeline.
> > > > > > > >>>>>>>>> Notice ones published at 14 and 12 seconds.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Now I am not sure if problem is with Java ie I should
> > use
> > > > > > > >> Comparator
> > > > > > > >>>>>>>>> interface and not Comparable for my Message object.
> Or
> > > the
> > > > > > > problem
> > > > > > > >> is
> > > > > > > >>>>>>>> with
> > > > > > > >>>>>>>>> Kafka stream or with serializing and de-serializing
> the
> > > set
> > > > > of
> > > > > > > >>>>>>> messages.
> > > > > > > >>>>>>>> If
> > > > > > > >>>>>>>>> I replace Set with List all is working fine.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Anyway any ideas here would be appreciated, meanwhile
> > let
> > > > me
> > > > > > see
> > > > > > > >> what
> > > > > > > >>>>>>> is
> > > > > > > >>>>>>>>> the best java practice here.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Thanks
> > > > > > > >>>>>>>>> Sachin
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
> > > > > > > >> mich...@confluent.io>
> > > > > > > >>>>>>>>> wrote:
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
> > > > > > > >> sjmit...@gmail.com
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> wrote:
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> > > > > > > >>>>>>>>>>> Say I am having a window of 60 minutes advanced by
> 15
> > > > > > minutes.
> > > > > > > >>>>>>>>>>> If the stream app using timestamp extractor puts
> the
> > > > > message
> > > > > > in
> > > > > > > >>>>>> one
> > > > > > > >>>>>>>> or
> > > > > > > >>>>>>>>>> more
> > > > > > > >>>>>>>>>>> bucket(s), it will get aggregated in those buckets.
> > > > > > > >>>>>>>>>>> I assume this statement is correct.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Yes.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Also say when I restart the streams application
> then
> > > > bucket
> > > > > > > >>>>>>>> aggregation
> > > > > > > >>>>>>>>>>> will resume from last point of halt.
> > > > > > > >>>>>>>>>>> I hope this is also correct.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Yes.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>> What I noticed that once a message is placed in one
> > > > bucket,
> > > > > > > that
> > > > > > > >>>>>>>> bucket
> > > > > > > >>>>>>>>>> was
> > > > > > > >>>>>>>>>>> not getting new messages.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> This should not happen...
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>> However when I ran a small test case replicating
> > that,
> > > it
> > > > > is
> > > > > > > >>>>>>> working
> > > > > > > >>>>>>>>>>> properly. There maybe some issues in application
> > reset.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> ...and apparently it works (as expected) in your
> small
> > > > test
> > > > > > > case.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Do you have any further information that you could
> > share
> > > > > with
> > > > > > us
> > > > > > > >> so
> > > > > > > >>>>>>> we
> > > > > > > >>>>>>>>> can
> > > > > > > >>>>>>>>>> help you better?  What's the difference, for
> example,
> > > > > between
> > > > > > > your
> > > > > > > >>>>>>>>> "normal"
> > > > > > > >>>>>>>>>> use case and the small test case you have been
> > referring
> > > > to?
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> -Michael
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> --
> > > > > > > >>>>>>>> -- Guozhang
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> --
> > > > > > > >>>>>> -- Guozhang
> > > > > > > >>>>>>
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to