Sachin,

One thing to note is that the retention of the windowed stores works by
keeping multiple segments of the stores where each segments stores a time
range which can potentially span multiple windows, if a new window needs to
be created that is further from the oldest segment's time range + retention
period (from your code it seems you do not override it from
TimeWindows.of("stream-table",
10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
day is used.

So with WallclockTimeExtractor since it is using system time, it wont give
you timestamps that span for more than a day during a short period of time,
but if your own defined timestamps expand that value, then old segments
will be dropped immediately and hence the aggregate values will be returned
as a single value.

Guozhang


On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> The extractor is used in
>
> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
>
> Let us know, if you could resolve the problem or need more help.
>
> -Matthias
>
> On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> > using. The version is 0.5x. Can you please tell me what code in streams
> > calls the timestamp extractor. I can look there to see if there is any
> > issue.
> >
> > Again issue happens only when producing the messages using producer that
> is
> > compatible with kafka version 0.8x. I see that this producer does not
> send
> > a record timestamp as this was introduced in version 0.10 only.
> >
> > Thanks
> > Sachin
> >
> > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> >
> >> I am not sure what is happening. That's why it would be good to have a
> >> toy example to reproduce the issue.
> >>
> >> What do you mean by "Kafka node version 0.5"?
> >>
> >> -Matthias
> >>
> >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> >>> I can provide with the data but data does not seem to be the issue.
> >>> If I submit the same data and use same timestamp extractor  using the
> >> java
> >>> client with kafka version 0.10.0.1 aggregation works fine.
> >>> I find the issue only when submitting the data with kafka node version
> >> 0.5.
> >>> It looks like the stream does not extract the time correctly in that
> >> case.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io>
> >> wrote:
> >>>
> >>>> Can you provide example input data (including timetamps) and result.
> >>>> What is the expected result (ie, what aggregation do you apply)?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> After much debugging I found an issue with timestamp extractor.
> >>>>>
> >>>>> If I use a custom timestamp extractor with following code:
> >>>>>     public static class MessageTimestampExtractor implements
> >>>>> TimestampExtractor {
> >>>>>         public long extract(ConsumerRecord<Object, Object> record) {
> >>>>>             if (record.value() instanceof Message) {
> >>>>>                 return ((Message) record.value()).ts;
> >>>>>             } else {
> >>>>>                 return record.timestamp();
> >>>>>             }
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>> Here message has a long field ts which stores the timestamp, the
> >>>>> aggregation does not work.
> >>>>> Note I have checked and ts has valid timestamp values.
> >>>>>
> >>>>> However if I replace it with say WallclockTimestampExtractor
> >> aggregation
> >>>> is
> >>>>> working fine.
> >>>>>
> >>>>> I do not understand what could be the issue here.
> >>>>>
> >>>>> Also note I am using kafka streams version 0.10.0.1 and I am
> publishing
> >>>>> messages via
> >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old
> >> 0.5.x
> >>>>>
> >>>>> Let me know if there is some bug in time stamp extractions.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Sachin,
> >>>>>>
> >>>>>> This is indeed a bit wired, and we'd like to try to re-produce your
> >>>> issue
> >>>>>> locally. Do you have a sample input data for us to try out?
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>> I fixed that sorted set issue but I am facing a weird problem
> which I
> >>>> am
> >>>>>>> not able to replicate.
> >>>>>>>
> >>>>>>> Here is the sample problem that I could isolate:
> >>>>>>> My class is like this:
> >>>>>>>     public static class Message implements Comparable<Message> {
> >>>>>>>         public long ts;
> >>>>>>>         public String message;
> >>>>>>>         public String key;
> >>>>>>>         public Message() {};
> >>>>>>>         public Message(long ts, String message, String key) {
> >>>>>>>             this.ts = ts;
> >>>>>>>             this.key = key;
> >>>>>>>             this.message = message;
> >>>>>>>         }
> >>>>>>>         public int compareTo(Message paramT) {
> >>>>>>>             long ts1 = paramT.ts;
> >>>>>>>             return ts > ts1 ? 1 : -1;
> >>>>>>>         }
> >>>>>>>     }
> >>>>>>>
> >>>>>>> pipeline is like this:
> >>>>>>> builder.stream(Serdes.String(), messageSerde,
> "test-window-stream")\
> >>>>>>>  .map(new KeyValueMapper<String, Message, KeyValue<String,
> >> Message>>()
> >>>> {
> >>>>>>>      public KeyValue<String, Message> apply(String key, Message
> >> value)
> >>>> {
> >>>>>>>          return new KeyValue<String, Message>(value.key, value);
> >>>>>>>       }
> >>>>>>>  })
> >>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >>>>>>>     public SortedSet<Message> apply() {
> >>>>>>>         return new TreeSet<Message>();
> >>>>>>>     }
> >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
> >>>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
> >>>>>>> SortedSet<Message> aggregate) {
> >>>>>>>         aggregate.add(value);
> >>>>>>>         return aggregate;
> >>>>>>>     }
> >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> >>>>>>> Serdes.String(), messagesSerde)
> >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> SortedSet<Message>>() {
> >>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
> >>>> messages)
> >>>>>> {
> >>>>>>>         ...
> >>>>>>>     }
> >>>>>>> });
> >>>>>>>
> >>>>>>> So basically I rekey the original message into another topic and
> then
> >>>>>>> aggregate it based on that key.
> >>>>>>> What I have observed is that when I used windowed aggregation the
> >>>>>>> aggregator does not use previous aggregated value.
> >>>>>>>
> >>>>>>> public SortedSet<Message> apply(String aggKey, Message value,
> >>>>>>> SortedSet<Message> aggregate) {
> >>>>>>>     aggregate.add(value);
> >>>>>>>     return aggregate;
> >>>>>>> }
> >>>>>>>
> >>>>>>> So in the above function the aggregate is an empty set of every
> value
> >>>>>>> entering into pipeline. When I remove the windowed aggregation, the
> >>>>>>> aggregate set retains previously aggregated values in the set.
> >>>>>>>
> >>>>>>> I am just not able to wrap my head around it. When I ran this type
> of
> >>>>>> test
> >>>>>>> locally on windows it is working fine. However a similar pipeline
> >> setup
> >>>>>>> when run against production on linux is behaving strangely and
> always
> >>>>>>> getting an empty aggregate set.
> >>>>>>> Any idea what could be the reason, where should I look at the
> >> problem.
> >>>>>> Does
> >>>>>>> length of key string matters here? I will later try to run the same
> >>>>>> simple
> >>>>>>> setup on linux and see what happens. But this is a very strange
> >>>> behavior.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Sachin,
> >>>>>>>>
> >>>>>>>> In the implementation of SortedSet, if the object's implemented
> the
> >>>>>>>> Comparable interface, that compareTo function is applied in "
> >>>>>>>> aggregate.add(value);", and hence if it returns 0, this element
> will
> >>>>>> not
> >>>>>>> be
> >>>>>>>> added since it is a Set.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <
> sjmit...@gmail.com
> >>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>> What I find is that when I use sorted set as aggregation it fails
> >> to
> >>>>>>>>> aggregate the values which have compareTo returning 0.
> >>>>>>>>>
> >>>>>>>>> My class is like this:
> >>>>>>>>>     public class Message implements Comparable<Message> {
> >>>>>>>>>         public long ts;
> >>>>>>>>>         public String message;
> >>>>>>>>>         public Message() {};
> >>>>>>>>>         public Message(long ts, String message) {
> >>>>>>>>>             this.ts = ts;
> >>>>>>>>>             this.message = message;
> >>>>>>>>>         }
> >>>>>>>>>         public int compareTo(Message paramT) {
> >>>>>>>>>             long ts1 = paramT.ts;
> >>>>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> >>>>>>>>>         }
> >>>>>>>>>     }
> >>>>>>>>>
> >>>>>>>>> pipeline is like this:
> >>>>>>>>> builder.stream(Serdes.String(), messageSerde,
> >> "test-window-stream")
> >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >>>>>>>>>     public SortedSet<Message> apply() {
> >>>>>>>>>         return new TreeSet<Message>();
> >>>>>>>>>     }
> >>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
> >>>>>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
> >>>>>>>>> SortedSet<Message> aggregate) {
> >>>>>>>>>         aggregate.add(value);
> >>>>>>>>>         return aggregate;
> >>>>>>>>>     }
> >>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 *
> 1000L),
> >>>>>>>>> Serdes.String(), messagesSerde)
> >>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
> >> SortedSet<Message>>() {
> >>>>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
> >>>>>>> messages)
> >>>>>>>> {
> >>>>>>>>>         ...
> >>>>>>>>>     }
> >>>>>>>>> });
> >>>>>>>>>
> >>>>>>>>> So any message published between 10 and 20 seconds gets
> aggregated
> >> in
> >>>>>>> 10
> >>>>>>>> -
> >>>>>>>>> 20 bucket and I print the size of the set.
> >>>>>>>>> However output I get is following:
> >>>>>>>>>
> >>>>>>>>> Published: 14
> >>>>>>>>> Aggregated: 10  20 -> 1
> >>>>>>>>>
> >>>>>>>>> Published: 18
> >>>>>>>>> Aggregated: 10  20 -> 2
> >>>>>>>>>
> >>>>>>>>> Published: 11
> >>>>>>>>> Aggregated: 10  20 -> 3
> >>>>>>>>>
> >>>>>>>>> Published: 17
> >>>>>>>>> Aggregated: 10  20 -> 4
> >>>>>>>>>
> >>>>>>>>> Published: 14
> >>>>>>>>> Aggregated: 10  20 -> 4
> >>>>>>>>>
> >>>>>>>>> Published: 15
> >>>>>>>>> Aggregated: 10  20 -> 5
> >>>>>>>>>
> >>>>>>>>> Published: 12
> >>>>>>>>> Aggregated: key2  10  20 -> 6
> >>>>>>>>>
> >>>>>>>>> Published: 12
> >>>>>>>>> Aggregated: 10  20 -> 6
> >>>>>>>>>
> >>>>>>>>> So if you see any message that occurs again for same second,
> where
> >>>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
> >>>>>>>>> Notice ones published at 14 and 12 seconds.
> >>>>>>>>>
> >>>>>>>>> Now I am not sure if problem is with Java ie I should use
> >> Comparator
> >>>>>>>>> interface and not Comparable for my Message object. Or the
> problem
> >> is
> >>>>>>>> with
> >>>>>>>>> Kafka stream or with serializing and de-serializing the set of
> >>>>>>> messages.
> >>>>>>>> If
> >>>>>>>>> I replace Set with List all is working fine.
> >>>>>>>>>
> >>>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see
> >> what
> >>>>>>> is
> >>>>>>>>> the best java practice here.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Sachin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
> >> mich...@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
> >> sjmit...@gmail.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
> >>>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
> >>>>>>>>>>> If the stream app using timestamp extractor puts the message in
> >>>>>> one
> >>>>>>>> or
> >>>>>>>>>> more
> >>>>>>>>>>> bucket(s), it will get aggregated in those buckets.
> >>>>>>>>>>> I assume this statement is correct.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Yes.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Also say when I restart the streams application then bucket
> >>>>>>>> aggregation
> >>>>>>>>>>> will resume from last point of halt.
> >>>>>>>>>>> I hope this is also correct.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Yes.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> What I noticed that once a message is placed in one bucket,
> that
> >>>>>>>> bucket
> >>>>>>>>>> was
> >>>>>>>>>>> not getting new messages.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> This should not happen...
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> However when I ran a small test case replicating that, it is
> >>>>>>> working
> >>>>>>>>>>> properly. There maybe some issues in application reset.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ...and apparently it works (as expected) in your small test
> case.
> >>>>>>>>>>
> >>>>>>>>>> Do you have any further information that you could share with us
> >> so
> >>>>>>> we
> >>>>>>>>> can
> >>>>>>>>>> help you better?  What's the difference, for example, between
> your
> >>>>>>>>> "normal"
> >>>>>>>>>> use case and the small test case you have been referring to?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Michael
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>


-- 
-- Guozhang

Reply via email to