I can provide with the data but data does not seem to be the issue.
If I submit the same data and use same timestamp extractor  using the java
client with kafka version 0.10.0.1 aggregation works fine.
I find the issue only when submitting the data with kafka node version 0.5.
It looks like the stream does not extract the time correctly in that case.

Thanks
Sachin

On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> wrote:

> Can you provide example input data (including timetamps) and result.
> What is the expected result (ie, what aggregation do you apply)?
>
>
> -Matthias
>
> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > Hi,
> > After much debugging I found an issue with timestamp extractor.
> >
> > If I use a custom timestamp extractor with following code:
> >     public static class MessageTimestampExtractor implements
> > TimestampExtractor {
> >         public long extract(ConsumerRecord<Object, Object> record) {
> >             if (record.value() instanceof Message) {
> >                 return ((Message) record.value()).ts;
> >             } else {
> >                 return record.timestamp();
> >             }
> >         }
> >     }
> >
> > Here message has a long field ts which stores the timestamp, the
> > aggregation does not work.
> > Note I have checked and ts has valid timestamp values.
> >
> > However if I replace it with say WallclockTimestampExtractor aggregation
> is
> > working fine.
> >
> > I do not understand what could be the issue here.
> >
> > Also note I am using kafka streams version 0.10.0.1 and I am publishing
> > messages via
> > https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
> >
> > Let me know if there is some bug in time stamp extractions.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Sachin,
> >>
> >> This is indeed a bit wired, and we'd like to try to re-produce your
> issue
> >> locally. Do you have a sample input data for us to try out?
> >>
> >> Guozhang
> >>
> >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com>
> >> wrote:
> >>
> >>> Hi,
> >>> I fixed that sorted set issue but I am facing a weird problem which I
> am
> >>> not able to replicate.
> >>>
> >>> Here is the sample problem that I could isolate:
> >>> My class is like this:
> >>>     public static class Message implements Comparable<Message> {
> >>>         public long ts;
> >>>         public String message;
> >>>         public String key;
> >>>         public Message() {};
> >>>         public Message(long ts, String message, String key) {
> >>>             this.ts = ts;
> >>>             this.key = key;
> >>>             this.message = message;
> >>>         }
> >>>         public int compareTo(Message paramT) {
> >>>             long ts1 = paramT.ts;
> >>>             return ts > ts1 ? 1 : -1;
> >>>         }
> >>>     }
> >>>
> >>> pipeline is like this:
> >>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
> >>>  .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>()
> {
> >>>      public KeyValue<String, Message> apply(String key, Message value)
> {
> >>>          return new KeyValue<String, Message>(value.key, value);
> >>>       }
> >>>  })
> >>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> >>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >>>     public SortedSet<Message> apply() {
> >>>         return new TreeSet<Message>();
> >>>     }
> >>> }, new Aggregator<String, Message, SortedSet<Message>>() {
> >>>     public SortedSet<Message> apply(String aggKey, Message value,
> >>> SortedSet<Message> aggregate) {
> >>>         aggregate.add(value);
> >>>         return aggregate;
> >>>     }
> >>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> >>> Serdes.String(), messagesSerde)
> >>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
> >>>     public void apply(Windowed<String> key, SortedSet<Message>
> messages)
> >> {
> >>>         ...
> >>>     }
> >>> });
> >>>
> >>> So basically I rekey the original message into another topic and then
> >>> aggregate it based on that key.
> >>> What I have observed is that when I used windowed aggregation the
> >>> aggregator does not use previous aggregated value.
> >>>
> >>> public SortedSet<Message> apply(String aggKey, Message value,
> >>> SortedSet<Message> aggregate) {
> >>>     aggregate.add(value);
> >>>     return aggregate;
> >>> }
> >>>
> >>> So in the above function the aggregate is an empty set of every value
> >>> entering into pipeline. When I remove the windowed aggregation, the
> >>> aggregate set retains previously aggregated values in the set.
> >>>
> >>> I am just not able to wrap my head around it. When I ran this type of
> >> test
> >>> locally on windows it is working fine. However a similar pipeline setup
> >>> when run against production on linux is behaving strangely and always
> >>> getting an empty aggregate set.
> >>> Any idea what could be the reason, where should I look at the problem.
> >> Does
> >>> length of key string matters here? I will later try to run the same
> >> simple
> >>> setup on linux and see what happens. But this is a very strange
> behavior.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>>
> >>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hello Sachin,
> >>>>
> >>>> In the implementation of SortedSet, if the object's implemented the
> >>>> Comparable interface, that compareTo function is applied in "
> >>>> aggregate.add(value);", and hence if it returns 0, this element will
> >> not
> >>> be
> >>>> added since it is a Set.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>> What I find is that when I use sorted set as aggregation it fails to
> >>>>> aggregate the values which have compareTo returning 0.
> >>>>>
> >>>>> My class is like this:
> >>>>>     public class Message implements Comparable<Message> {
> >>>>>         public long ts;
> >>>>>         public String message;
> >>>>>         public Message() {};
> >>>>>         public Message(long ts, String message) {
> >>>>>             this.ts = ts;
> >>>>>             this.message = message;
> >>>>>         }
> >>>>>         public int compareTo(Message paramT) {
> >>>>>             long ts1 = paramT.ts;
> >>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>> pipeline is like this:
> >>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> >>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >>>>>     public SortedSet<Message> apply() {
> >>>>>         return new TreeSet<Message>();
> >>>>>     }
> >>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
> >>>>>     public SortedSet<Message> apply(String aggKey, Message value,
> >>>>> SortedSet<Message> aggregate) {
> >>>>>         aggregate.add(value);
> >>>>>         return aggregate;
> >>>>>     }
> >>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> >>>>> Serdes.String(), messagesSerde)
> >>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
> >>>>>     public void apply(Windowed<String> key, SortedSet<Message>
> >>> messages)
> >>>> {
> >>>>>         ...
> >>>>>     }
> >>>>> });
> >>>>>
> >>>>> So any message published between 10 and 20 seconds gets aggregated in
> >>> 10
> >>>> -
> >>>>> 20 bucket and I print the size of the set.
> >>>>> However output I get is following:
> >>>>>
> >>>>> Published: 14
> >>>>> Aggregated: 10  20 -> 1
> >>>>>
> >>>>> Published: 18
> >>>>> Aggregated: 10  20 -> 2
> >>>>>
> >>>>> Published: 11
> >>>>> Aggregated: 10  20 -> 3
> >>>>>
> >>>>> Published: 17
> >>>>> Aggregated: 10  20 -> 4
> >>>>>
> >>>>> Published: 14
> >>>>> Aggregated: 10  20 -> 4
> >>>>>
> >>>>> Published: 15
> >>>>> Aggregated: 10  20 -> 5
> >>>>>
> >>>>> Published: 12
> >>>>> Aggregated: key2  10  20 -> 6
> >>>>>
> >>>>> Published: 12
> >>>>> Aggregated: 10  20 -> 6
> >>>>>
> >>>>> So if you see any message that occurs again for same second, where
> >>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
> >>>>> Notice ones published at 14 and 12 seconds.
> >>>>>
> >>>>> Now I am not sure if problem is with Java ie I should use Comparator
> >>>>> interface and not Comparable for my Message object. Or the problem is
> >>>> with
> >>>>> Kafka stream or with serializing and de-serializing the set of
> >>> messages.
> >>>> If
> >>>>> I replace Set with List all is working fine.
> >>>>>
> >>>>> Anyway any ideas here would be appreciated, meanwhile let me see what
> >>> is
> >>>>> the best java practice here.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com
> >>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> I am using kafka_2.10-0.10.0.1.
> >>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
> >>>>>>> If the stream app using timestamp extractor puts the message in
> >> one
> >>>> or
> >>>>>> more
> >>>>>>> bucket(s), it will get aggregated in those buckets.
> >>>>>>> I assume this statement is correct.
> >>>>>>>
> >>>>>>
> >>>>>> Yes.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> Also say when I restart the streams application then bucket
> >>>> aggregation
> >>>>>>> will resume from last point of halt.
> >>>>>>> I hope this is also correct.
> >>>>>>>
> >>>>>>
> >>>>>> Yes.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> What I noticed that once a message is placed in one bucket, that
> >>>> bucket
> >>>>>> was
> >>>>>>> not getting new messages.
> >>>>>>>
> >>>>>>
> >>>>>> This should not happen...
> >>>>>>
> >>>>>>
> >>>>>>> However when I ran a small test case replicating that, it is
> >>> working
> >>>>>>> properly. There maybe some issues in application reset.
> >>>>>>>
> >>>>>>
> >>>>>> ...and apparently it works (as expected) in your small test case.
> >>>>>>
> >>>>>> Do you have any further information that you could share with us so
> >>> we
> >>>>> can
> >>>>>> help you better?  What's the difference, for example, between your
> >>>>> "normal"
> >>>>>> use case and the small test case you have been referring to?
> >>>>>>
> >>>>>>
> >>>>>> -Michael
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

Reply via email to