I can provide with the data but data does not seem to be the issue. If I submit the same data and use same timestamp extractor using the java client with kafka version 0.10.0.1 aggregation works fine. I find the issue only when submitting the data with kafka node version 0.5. It looks like the stream does not extract the time correctly in that case.
Thanks Sachin On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> wrote: > Can you provide example input data (including timetamps) and result. > What is the expected result (ie, what aggregation do you apply)? > > > -Matthias > > On 12/2/16 7:43 AM, Sachin Mittal wrote: > > Hi, > > After much debugging I found an issue with timestamp extractor. > > > > If I use a custom timestamp extractor with following code: > > public static class MessageTimestampExtractor implements > > TimestampExtractor { > > public long extract(ConsumerRecord<Object, Object> record) { > > if (record.value() instanceof Message) { > > return ((Message) record.value()).ts; > > } else { > > return record.timestamp(); > > } > > } > > } > > > > Here message has a long field ts which stores the timestamp, the > > aggregation does not work. > > Note I have checked and ts has valid timestamp values. > > > > However if I replace it with say WallclockTimestampExtractor aggregation > is > > working fine. > > > > I do not understand what could be the issue here. > > > > Also note I am using kafka streams version 0.10.0.1 and I am publishing > > messages via > > https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x > > > > Let me know if there is some bug in time stamp extractions. > > > > Thanks > > Sachin > > > > > > > > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Sachin, > >> > >> This is indeed a bit wired, and we'd like to try to re-produce your > issue > >> locally. Do you have a sample input data for us to try out? > >> > >> Guozhang > >> > >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com> > >> wrote: > >> > >>> Hi, > >>> I fixed that sorted set issue but I am facing a weird problem which I > am > >>> not able to replicate. > >>> > >>> Here is the sample problem that I could isolate: > >>> My class is like this: > >>> public static class Message implements Comparable<Message> { > >>> public long ts; > >>> public String message; > >>> public String key; > >>> public Message() {}; > >>> public Message(long ts, String message, String key) { > >>> this.ts = ts; > >>> this.key = key; > >>> this.message = message; > >>> } > >>> public int compareTo(Message paramT) { > >>> long ts1 = paramT.ts; > >>> return ts > ts1 ? 1 : -1; > >>> } > >>> } > >>> > >>> pipeline is like this: > >>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\ > >>> .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() > { > >>> public KeyValue<String, Message> apply(String key, Message value) > { > >>> return new KeyValue<String, Message>(value.key, value); > >>> } > >>> }) > >>> .through(Serdes.String(), messageSerde, "test-window-key-stream") > >>> .aggregateByKey(new Initializer<SortedSet<Message>>() { > >>> public SortedSet<Message> apply() { > >>> return new TreeSet<Message>(); > >>> } > >>> }, new Aggregator<String, Message, SortedSet<Message>>() { > >>> public SortedSet<Message> apply(String aggKey, Message value, > >>> SortedSet<Message> aggregate) { > >>> aggregate.add(value); > >>> return aggregate; > >>> } > >>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > >>> Serdes.String(), messagesSerde) > >>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > >>> public void apply(Windowed<String> key, SortedSet<Message> > messages) > >> { > >>> ... > >>> } > >>> }); > >>> > >>> So basically I rekey the original message into another topic and then > >>> aggregate it based on that key. > >>> What I have observed is that when I used windowed aggregation the > >>> aggregator does not use previous aggregated value. > >>> > >>> public SortedSet<Message> apply(String aggKey, Message value, > >>> SortedSet<Message> aggregate) { > >>> aggregate.add(value); > >>> return aggregate; > >>> } > >>> > >>> So in the above function the aggregate is an empty set of every value > >>> entering into pipeline. When I remove the windowed aggregation, the > >>> aggregate set retains previously aggregated values in the set. > >>> > >>> I am just not able to wrap my head around it. When I ran this type of > >> test > >>> locally on windows it is working fine. However a similar pipeline setup > >>> when run against production on linux is behaving strangely and always > >>> getting an empty aggregate set. > >>> Any idea what could be the reason, where should I look at the problem. > >> Does > >>> length of key string matters here? I will later try to run the same > >> simple > >>> setup on linux and see what happens. But this is a very strange > behavior. > >>> > >>> Thanks > >>> Sachin > >>> > >>> > >>> > >>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>> > >>>> Hello Sachin, > >>>> > >>>> In the implementation of SortedSet, if the object's implemented the > >>>> Comparable interface, that compareTo function is applied in " > >>>> aggregate.add(value);", and hence if it returns 0, this element will > >> not > >>> be > >>>> added since it is a Set. > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> What I find is that when I use sorted set as aggregation it fails to > >>>>> aggregate the values which have compareTo returning 0. > >>>>> > >>>>> My class is like this: > >>>>> public class Message implements Comparable<Message> { > >>>>> public long ts; > >>>>> public String message; > >>>>> public Message() {}; > >>>>> public Message(long ts, String message) { > >>>>> this.ts = ts; > >>>>> this.message = message; > >>>>> } > >>>>> public int compareTo(Message paramT) { > >>>>> long ts1 = paramT.ts; > >>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; > >>>>> } > >>>>> } > >>>>> > >>>>> pipeline is like this: > >>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream") > >>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { > >>>>> public SortedSet<Message> apply() { > >>>>> return new TreeSet<Message>(); > >>>>> } > >>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { > >>>>> public SortedSet<Message> apply(String aggKey, Message value, > >>>>> SortedSet<Message> aggregate) { > >>>>> aggregate.add(value); > >>>>> return aggregate; > >>>>> } > >>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > >>>>> Serdes.String(), messagesSerde) > >>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > >>>>> public void apply(Windowed<String> key, SortedSet<Message> > >>> messages) > >>>> { > >>>>> ... > >>>>> } > >>>>> }); > >>>>> > >>>>> So any message published between 10 and 20 seconds gets aggregated in > >>> 10 > >>>> - > >>>>> 20 bucket and I print the size of the set. > >>>>> However output I get is following: > >>>>> > >>>>> Published: 14 > >>>>> Aggregated: 10 20 -> 1 > >>>>> > >>>>> Published: 18 > >>>>> Aggregated: 10 20 -> 2 > >>>>> > >>>>> Published: 11 > >>>>> Aggregated: 10 20 -> 3 > >>>>> > >>>>> Published: 17 > >>>>> Aggregated: 10 20 -> 4 > >>>>> > >>>>> Published: 14 > >>>>> Aggregated: 10 20 -> 4 > >>>>> > >>>>> Published: 15 > >>>>> Aggregated: 10 20 -> 5 > >>>>> > >>>>> Published: 12 > >>>>> Aggregated: key2 10 20 -> 6 > >>>>> > >>>>> Published: 12 > >>>>> Aggregated: 10 20 -> 6 > >>>>> > >>>>> So if you see any message that occurs again for same second, where > >>>>> compareTo returns 0, it fails to get aggregated in the pipeline. > >>>>> Notice ones published at 14 and 12 seconds. > >>>>> > >>>>> Now I am not sure if problem is with Java ie I should use Comparator > >>>>> interface and not Comparable for my Message object. Or the problem is > >>>> with > >>>>> Kafka stream or with serializing and de-serializing the set of > >>> messages. > >>>> If > >>>>> I replace Set with List all is working fine. > >>>>> > >>>>> Anyway any ideas here would be appreciated, meanwhile let me see what > >>> is > >>>>> the best java practice here. > >>>>> > >>>>> Thanks > >>>>> Sachin > >>>>> > >>>>> > >>>>> > >>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com > >>> > >>>>> wrote: > >>>>>> > >>>>>>> I am using kafka_2.10-0.10.0.1. > >>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. > >>>>>>> If the stream app using timestamp extractor puts the message in > >> one > >>>> or > >>>>>> more > >>>>>>> bucket(s), it will get aggregated in those buckets. > >>>>>>> I assume this statement is correct. > >>>>>>> > >>>>>> > >>>>>> Yes. > >>>>>> > >>>>>> > >>>>>> > >>>>>>> > >>>>>>> Also say when I restart the streams application then bucket > >>>> aggregation > >>>>>>> will resume from last point of halt. > >>>>>>> I hope this is also correct. > >>>>>>> > >>>>>> > >>>>>> Yes. > >>>>>> > >>>>>> > >>>>>> > >>>>>>> What I noticed that once a message is placed in one bucket, that > >>>> bucket > >>>>>> was > >>>>>>> not getting new messages. > >>>>>>> > >>>>>> > >>>>>> This should not happen... > >>>>>> > >>>>>> > >>>>>>> However when I ran a small test case replicating that, it is > >>> working > >>>>>>> properly. There maybe some issues in application reset. > >>>>>>> > >>>>>> > >>>>>> ...and apparently it works (as expected) in your small test case. > >>>>>> > >>>>>> Do you have any further information that you could share with us so > >>> we > >>>>> can > >>>>>> help you better? What's the difference, for example, between your > >>>>> "normal" > >>>>>> use case and the small test case you have been referring to? > >>>>>> > >>>>>> > >>>>>> -Michael > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > >