https://github.com/SOHU-Co/kafka-node/ this is the node js client i am using. The version is 0.5x. Can you please tell me what code in streams calls the timestamp extractor. I can look there to see if there is any issue.
Again issue happens only when producing the messages using producer that is compatible with kafka version 0.8x. I see that this producer does not send a record timestamp as this was introduced in version 0.10 only. Thanks Sachin On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io> wrote: > I am not sure what is happening. That's why it would be good to have a > toy example to reproduce the issue. > > What do you mean by "Kafka node version 0.5"? > > -Matthias > > On 12/2/16 11:30 AM, Sachin Mittal wrote: > > I can provide with the data but data does not seem to be the issue. > > If I submit the same data and use same timestamp extractor using the > java > > client with kafka version 0.10.0.1 aggregation works fine. > > I find the issue only when submitting the data with kafka node version > 0.5. > > It looks like the stream does not extract the time correctly in that > case. > > > > Thanks > > Sachin > > > > On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> > wrote: > > > >> Can you provide example input data (including timetamps) and result. > >> What is the expected result (ie, what aggregation do you apply)? > >> > >> > >> -Matthias > >> > >> On 12/2/16 7:43 AM, Sachin Mittal wrote: > >>> Hi, > >>> After much debugging I found an issue with timestamp extractor. > >>> > >>> If I use a custom timestamp extractor with following code: > >>> public static class MessageTimestampExtractor implements > >>> TimestampExtractor { > >>> public long extract(ConsumerRecord<Object, Object> record) { > >>> if (record.value() instanceof Message) { > >>> return ((Message) record.value()).ts; > >>> } else { > >>> return record.timestamp(); > >>> } > >>> } > >>> } > >>> > >>> Here message has a long field ts which stores the timestamp, the > >>> aggregation does not work. > >>> Note I have checked and ts has valid timestamp values. > >>> > >>> However if I replace it with say WallclockTimestampExtractor > aggregation > >> is > >>> working fine. > >>> > >>> I do not understand what could be the issue here. > >>> > >>> Also note I am using kafka streams version 0.10.0.1 and I am publishing > >>> messages via > >>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old > 0.5.x > >>> > >>> Let me know if there is some bug in time stamp extractions. > >>> > >>> Thanks > >>> Sachin > >>> > >>> > >>> > >>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> Sachin, > >>>> > >>>> This is indeed a bit wired, and we'd like to try to re-produce your > >> issue > >>>> locally. Do you have a sample input data for us to try out? > >>>> > >>>> Guozhang > >>>> > >>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> I fixed that sorted set issue but I am facing a weird problem which I > >> am > >>>>> not able to replicate. > >>>>> > >>>>> Here is the sample problem that I could isolate: > >>>>> My class is like this: > >>>>> public static class Message implements Comparable<Message> { > >>>>> public long ts; > >>>>> public String message; > >>>>> public String key; > >>>>> public Message() {}; > >>>>> public Message(long ts, String message, String key) { > >>>>> this.ts = ts; > >>>>> this.key = key; > >>>>> this.message = message; > >>>>> } > >>>>> public int compareTo(Message paramT) { > >>>>> long ts1 = paramT.ts; > >>>>> return ts > ts1 ? 1 : -1; > >>>>> } > >>>>> } > >>>>> > >>>>> pipeline is like this: > >>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\ > >>>>> .map(new KeyValueMapper<String, Message, KeyValue<String, > Message>>() > >> { > >>>>> public KeyValue<String, Message> apply(String key, Message > value) > >> { > >>>>> return new KeyValue<String, Message>(value.key, value); > >>>>> } > >>>>> }) > >>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream") > >>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { > >>>>> public SortedSet<Message> apply() { > >>>>> return new TreeSet<Message>(); > >>>>> } > >>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { > >>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>> SortedSet<Message> aggregate) { > >>>>> aggregate.add(value); > >>>>> return aggregate; > >>>>> } > >>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > >>>>> Serdes.String(), messagesSerde) > >>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > >>>>> public void apply(Windowed<String> key, SortedSet<Message> > >> messages) > >>>> { > >>>>> ... > >>>>> } > >>>>> }); > >>>>> > >>>>> So basically I rekey the original message into another topic and then > >>>>> aggregate it based on that key. > >>>>> What I have observed is that when I used windowed aggregation the > >>>>> aggregator does not use previous aggregated value. > >>>>> > >>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>> SortedSet<Message> aggregate) { > >>>>> aggregate.add(value); > >>>>> return aggregate; > >>>>> } > >>>>> > >>>>> So in the above function the aggregate is an empty set of every value > >>>>> entering into pipeline. When I remove the windowed aggregation, the > >>>>> aggregate set retains previously aggregated values in the set. > >>>>> > >>>>> I am just not able to wrap my head around it. When I ran this type of > >>>> test > >>>>> locally on windows it is working fine. However a similar pipeline > setup > >>>>> when run against production on linux is behaving strangely and always > >>>>> getting an empty aggregate set. > >>>>> Any idea what could be the reason, where should I look at the > problem. > >>>> Does > >>>>> length of key string matters here? I will later try to run the same > >>>> simple > >>>>> setup on linux and see what happens. But this is a very strange > >> behavior. > >>>>> > >>>>> Thanks > >>>>> Sachin > >>>>> > >>>>> > >>>>> > >>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hello Sachin, > >>>>>> > >>>>>> In the implementation of SortedSet, if the object's implemented the > >>>>>> Comparable interface, that compareTo function is applied in " > >>>>>> aggregate.add(value);", and hence if it returns 0, this element will > >>>> not > >>>>> be > >>>>>> added since it is a Set. > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com > > > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> What I find is that when I use sorted set as aggregation it fails > to > >>>>>>> aggregate the values which have compareTo returning 0. > >>>>>>> > >>>>>>> My class is like this: > >>>>>>> public class Message implements Comparable<Message> { > >>>>>>> public long ts; > >>>>>>> public String message; > >>>>>>> public Message() {}; > >>>>>>> public Message(long ts, String message) { > >>>>>>> this.ts = ts; > >>>>>>> this.message = message; > >>>>>>> } > >>>>>>> public int compareTo(Message paramT) { > >>>>>>> long ts1 = paramT.ts; > >>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> pipeline is like this: > >>>>>>> builder.stream(Serdes.String(), messageSerde, > "test-window-stream") > >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { > >>>>>>> public SortedSet<Message> apply() { > >>>>>>> return new TreeSet<Message>(); > >>>>>>> } > >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { > >>>>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>>>> SortedSet<Message> aggregate) { > >>>>>>> aggregate.add(value); > >>>>>>> return aggregate; > >>>>>>> } > >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > >>>>>>> Serdes.String(), messagesSerde) > >>>>>>> .foreach(new ForeachAction<Windowed<String>, > SortedSet<Message>>() { > >>>>>>> public void apply(Windowed<String> key, SortedSet<Message> > >>>>> messages) > >>>>>> { > >>>>>>> ... > >>>>>>> } > >>>>>>> }); > >>>>>>> > >>>>>>> So any message published between 10 and 20 seconds gets aggregated > in > >>>>> 10 > >>>>>> - > >>>>>>> 20 bucket and I print the size of the set. > >>>>>>> However output I get is following: > >>>>>>> > >>>>>>> Published: 14 > >>>>>>> Aggregated: 10 20 -> 1 > >>>>>>> > >>>>>>> Published: 18 > >>>>>>> Aggregated: 10 20 -> 2 > >>>>>>> > >>>>>>> Published: 11 > >>>>>>> Aggregated: 10 20 -> 3 > >>>>>>> > >>>>>>> Published: 17 > >>>>>>> Aggregated: 10 20 -> 4 > >>>>>>> > >>>>>>> Published: 14 > >>>>>>> Aggregated: 10 20 -> 4 > >>>>>>> > >>>>>>> Published: 15 > >>>>>>> Aggregated: 10 20 -> 5 > >>>>>>> > >>>>>>> Published: 12 > >>>>>>> Aggregated: key2 10 20 -> 6 > >>>>>>> > >>>>>>> Published: 12 > >>>>>>> Aggregated: 10 20 -> 6 > >>>>>>> > >>>>>>> So if you see any message that occurs again for same second, where > >>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline. > >>>>>>> Notice ones published at 14 and 12 seconds. > >>>>>>> > >>>>>>> Now I am not sure if problem is with Java ie I should use > Comparator > >>>>>>> interface and not Comparable for my Message object. Or the problem > is > >>>>>> with > >>>>>>> Kafka stream or with serializing and de-serializing the set of > >>>>> messages. > >>>>>> If > >>>>>>> I replace Set with List all is working fine. > >>>>>>> > >>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see > what > >>>>> is > >>>>>>> the best java practice here. > >>>>>>> > >>>>>>> Thanks > >>>>>>> Sachin > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll < > mich...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal < > sjmit...@gmail.com > >>>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> I am using kafka_2.10-0.10.0.1. > >>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. > >>>>>>>>> If the stream app using timestamp extractor puts the message in > >>>> one > >>>>>> or > >>>>>>>> more > >>>>>>>>> bucket(s), it will get aggregated in those buckets. > >>>>>>>>> I assume this statement is correct. > >>>>>>>>> > >>>>>>>> > >>>>>>>> Yes. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> > >>>>>>>>> Also say when I restart the streams application then bucket > >>>>>> aggregation > >>>>>>>>> will resume from last point of halt. > >>>>>>>>> I hope this is also correct. > >>>>>>>>> > >>>>>>>> > >>>>>>>> Yes. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> What I noticed that once a message is placed in one bucket, that > >>>>>> bucket > >>>>>>>> was > >>>>>>>>> not getting new messages. > >>>>>>>>> > >>>>>>>> > >>>>>>>> This should not happen... > >>>>>>>> > >>>>>>>> > >>>>>>>>> However when I ran a small test case replicating that, it is > >>>>> working > >>>>>>>>> properly. There maybe some issues in application reset. > >>>>>>>>> > >>>>>>>> > >>>>>>>> ...and apparently it works (as expected) in your small test case. > >>>>>>>> > >>>>>>>> Do you have any further information that you could share with us > so > >>>>> we > >>>>>>> can > >>>>>>>> help you better? What's the difference, for example, between your > >>>>>>> "normal" > >>>>>>>> use case and the small test case you have been referring to? > >>>>>>>> > >>>>>>>> > >>>>>>>> -Michael > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > > > >