https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
using. The version is 0.5x. Can you please tell me what code in streams
calls the timestamp extractor. I can look there to see if there is any
issue.

Again issue happens only when producing the messages using producer that is
compatible with kafka version 0.8x. I see that this producer does not send
a record timestamp as this was introduced in version 0.10 only.

Thanks
Sachin

On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io> wrote:

> I am not sure what is happening. That's why it would be good to have a
> toy example to reproduce the issue.
>
> What do you mean by "Kafka node version 0.5"?
>
> -Matthias
>
> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> > I can provide with the data but data does not seem to be the issue.
> > If I submit the same data and use same timestamp extractor  using the
> java
> > client with kafka version 0.10.0.1 aggregation works fine.
> > I find the issue only when submitting the data with kafka node version
> 0.5.
> > It looks like the stream does not extract the time correctly in that
> case.
> >
> > Thanks
> > Sachin
> >
> > On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io>
> wrote:
> >
> >> Can you provide example input data (including timetamps) and result.
> >> What is the expected result (ie, what aggregation do you apply)?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/2/16 7:43 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> After much debugging I found an issue with timestamp extractor.
> >>>
> >>> If I use a custom timestamp extractor with following code:
> >>>     public static class MessageTimestampExtractor implements
> >>> TimestampExtractor {
> >>>         public long extract(ConsumerRecord<Object, Object> record) {
> >>>             if (record.value() instanceof Message) {
> >>>                 return ((Message) record.value()).ts;
> >>>             } else {
> >>>                 return record.timestamp();
> >>>             }
> >>>         }
> >>>     }
> >>>
> >>> Here message has a long field ts which stores the timestamp, the
> >>> aggregation does not work.
> >>> Note I have checked and ts has valid timestamp values.
> >>>
> >>> However if I replace it with say WallclockTimestampExtractor
> aggregation
> >> is
> >>> working fine.
> >>>
> >>> I do not understand what could be the issue here.
> >>>
> >>> Also note I am using kafka streams version 0.10.0.1 and I am publishing
> >>> messages via
> >>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old
> 0.5.x
> >>>
> >>> Let me know if there is some bug in time stamp extractions.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>>
> >>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> Sachin,
> >>>>
> >>>> This is indeed a bit wired, and we'd like to try to re-produce your
> >> issue
> >>>> locally. Do you have a sample input data for us to try out?
> >>>>
> >>>> Guozhang
> >>>>
> >>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>> I fixed that sorted set issue but I am facing a weird problem which I
> >> am
> >>>>> not able to replicate.
> >>>>>
> >>>>> Here is the sample problem that I could isolate:
> >>>>> My class is like this:
> >>>>>     public static class Message implements Comparable<Message> {
> >>>>>         public long ts;
> >>>>>         public String message;
> >>>>>         public String key;
> >>>>>         public Message() {};
> >>>>>         public Message(long ts, String message, String key) {
> >>>>>             this.ts = ts;
> >>>>>             this.key = key;
> >>>>>             this.message = message;
> >>>>>         }
> >>>>>         public int compareTo(Message paramT) {
> >>>>>             long ts1 = paramT.ts;
> >>>>>             return ts > ts1 ? 1 : -1;
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>> pipeline is like this:
> >>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
> >>>>>  .map(new KeyValueMapper<String, Message, KeyValue<String,
> Message>>()
> >> {
> >>>>>      public KeyValue<String, Message> apply(String key, Message
> value)
> >> {
> >>>>>          return new KeyValue<String, Message>(value.key, value);
> >>>>>       }
> >>>>>  })
> >>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
> >>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >>>>>     public SortedSet<Message> apply() {
> >>>>>         return new TreeSet<Message>();
> >>>>>     }
> >>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
> >>>>>     public SortedSet<Message> apply(String aggKey, Message value,
> >>>>> SortedSet<Message> aggregate) {
> >>>>>         aggregate.add(value);
> >>>>>         return aggregate;
> >>>>>     }
> >>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> >>>>> Serdes.String(), messagesSerde)
> >>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
> >>>>>     public void apply(Windowed<String> key, SortedSet<Message>
> >> messages)
> >>>> {
> >>>>>         ...
> >>>>>     }
> >>>>> });
> >>>>>
> >>>>> So basically I rekey the original message into another topic and then
> >>>>> aggregate it based on that key.
> >>>>> What I have observed is that when I used windowed aggregation the
> >>>>> aggregator does not use previous aggregated value.
> >>>>>
> >>>>> public SortedSet<Message> apply(String aggKey, Message value,
> >>>>> SortedSet<Message> aggregate) {
> >>>>>     aggregate.add(value);
> >>>>>     return aggregate;
> >>>>> }
> >>>>>
> >>>>> So in the above function the aggregate is an empty set of every value
> >>>>> entering into pipeline. When I remove the windowed aggregation, the
> >>>>> aggregate set retains previously aggregated values in the set.
> >>>>>
> >>>>> I am just not able to wrap my head around it. When I ran this type of
> >>>> test
> >>>>> locally on windows it is working fine. However a similar pipeline
> setup
> >>>>> when run against production on linux is behaving strangely and always
> >>>>> getting an empty aggregate set.
> >>>>> Any idea what could be the reason, where should I look at the
> problem.
> >>>> Does
> >>>>> length of key string matters here? I will later try to run the same
> >>>> simple
> >>>>> setup on linux and see what happens. But this is a very strange
> >> behavior.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hello Sachin,
> >>>>>>
> >>>>>> In the implementation of SortedSet, if the object's implemented the
> >>>>>> Comparable interface, that compareTo function is applied in "
> >>>>>> aggregate.add(value);", and hence if it returns 0, this element will
> >>>> not
> >>>>> be
> >>>>>> added since it is a Set.
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>> What I find is that when I use sorted set as aggregation it fails
> to
> >>>>>>> aggregate the values which have compareTo returning 0.
> >>>>>>>
> >>>>>>> My class is like this:
> >>>>>>>     public class Message implements Comparable<Message> {
> >>>>>>>         public long ts;
> >>>>>>>         public String message;
> >>>>>>>         public Message() {};
> >>>>>>>         public Message(long ts, String message) {
> >>>>>>>             this.ts = ts;
> >>>>>>>             this.message = message;
> >>>>>>>         }
> >>>>>>>         public int compareTo(Message paramT) {
> >>>>>>>             long ts1 = paramT.ts;
> >>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> >>>>>>>         }
> >>>>>>>     }
> >>>>>>>
> >>>>>>> pipeline is like this:
> >>>>>>> builder.stream(Serdes.String(), messageSerde,
> "test-window-stream")
> >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >>>>>>>     public SortedSet<Message> apply() {
> >>>>>>>         return new TreeSet<Message>();
> >>>>>>>     }
> >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
> >>>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
> >>>>>>> SortedSet<Message> aggregate) {
> >>>>>>>         aggregate.add(value);
> >>>>>>>         return aggregate;
> >>>>>>>     }
> >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> >>>>>>> Serdes.String(), messagesSerde)
> >>>>>>> .foreach(new ForeachAction<Windowed<String>,
> SortedSet<Message>>() {
> >>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
> >>>>> messages)
> >>>>>> {
> >>>>>>>         ...
> >>>>>>>     }
> >>>>>>> });
> >>>>>>>
> >>>>>>> So any message published between 10 and 20 seconds gets aggregated
> in
> >>>>> 10
> >>>>>> -
> >>>>>>> 20 bucket and I print the size of the set.
> >>>>>>> However output I get is following:
> >>>>>>>
> >>>>>>> Published: 14
> >>>>>>> Aggregated: 10  20 -> 1
> >>>>>>>
> >>>>>>> Published: 18
> >>>>>>> Aggregated: 10  20 -> 2
> >>>>>>>
> >>>>>>> Published: 11
> >>>>>>> Aggregated: 10  20 -> 3
> >>>>>>>
> >>>>>>> Published: 17
> >>>>>>> Aggregated: 10  20 -> 4
> >>>>>>>
> >>>>>>> Published: 14
> >>>>>>> Aggregated: 10  20 -> 4
> >>>>>>>
> >>>>>>> Published: 15
> >>>>>>> Aggregated: 10  20 -> 5
> >>>>>>>
> >>>>>>> Published: 12
> >>>>>>> Aggregated: key2  10  20 -> 6
> >>>>>>>
> >>>>>>> Published: 12
> >>>>>>> Aggregated: 10  20 -> 6
> >>>>>>>
> >>>>>>> So if you see any message that occurs again for same second, where
> >>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
> >>>>>>> Notice ones published at 14 and 12 seconds.
> >>>>>>>
> >>>>>>> Now I am not sure if problem is with Java ie I should use
> Comparator
> >>>>>>> interface and not Comparable for my Message object. Or the problem
> is
> >>>>>> with
> >>>>>>> Kafka stream or with serializing and de-serializing the set of
> >>>>> messages.
> >>>>>> If
> >>>>>>> I replace Set with List all is working fine.
> >>>>>>>
> >>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see
> what
> >>>>> is
> >>>>>>> the best java practice here.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
> mich...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
> sjmit...@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I am using kafka_2.10-0.10.0.1.
> >>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
> >>>>>>>>> If the stream app using timestamp extractor puts the message in
> >>>> one
> >>>>>> or
> >>>>>>>> more
> >>>>>>>>> bucket(s), it will get aggregated in those buckets.
> >>>>>>>>> I assume this statement is correct.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> Yes.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Also say when I restart the streams application then bucket
> >>>>>> aggregation
> >>>>>>>>> will resume from last point of halt.
> >>>>>>>>> I hope this is also correct.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> Yes.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> What I noticed that once a message is placed in one bucket, that
> >>>>>> bucket
> >>>>>>>> was
> >>>>>>>>> not getting new messages.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> This should not happen...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> However when I ran a small test case replicating that, it is
> >>>>> working
> >>>>>>>>> properly. There maybe some issues in application reset.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> ...and apparently it works (as expected) in your small test case.
> >>>>>>>>
> >>>>>>>> Do you have any further information that you could share with us
> so
> >>>>> we
> >>>>>>> can
> >>>>>>>> help you better?  What's the difference, for example, between your
> >>>>>>> "normal"
> >>>>>>>> use case and the small test case you have been referring to?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Michael
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to