The extractor is used in org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
Let us know, if you could resolve the problem or need more help. -Matthias On 12/2/16 11:46 AM, Sachin Mittal wrote: > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am > using. The version is 0.5x. Can you please tell me what code in streams > calls the timestamp extractor. I can look there to see if there is any > issue. > > Again issue happens only when producing the messages using producer that is > compatible with kafka version 0.8x. I see that this producer does not send > a record timestamp as this was introduced in version 0.10 only. > > Thanks > Sachin > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <matth...@confluent.io> wrote: > >> I am not sure what is happening. That's why it would be good to have a >> toy example to reproduce the issue. >> >> What do you mean by "Kafka node version 0.5"? >> >> -Matthias >> >> On 12/2/16 11:30 AM, Sachin Mittal wrote: >>> I can provide with the data but data does not seem to be the issue. >>> If I submit the same data and use same timestamp extractor using the >> java >>> client with kafka version 0.10.0.1 aggregation works fine. >>> I find the issue only when submitting the data with kafka node version >> 0.5. >>> It looks like the stream does not extract the time correctly in that >> case. >>> >>> Thanks >>> Sachin >>> >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> >> wrote: >>> >>>> Can you provide example input data (including timetamps) and result. >>>> What is the expected result (ie, what aggregation do you apply)? >>>> >>>> >>>> -Matthias >>>> >>>> On 12/2/16 7:43 AM, Sachin Mittal wrote: >>>>> Hi, >>>>> After much debugging I found an issue with timestamp extractor. >>>>> >>>>> If I use a custom timestamp extractor with following code: >>>>> public static class MessageTimestampExtractor implements >>>>> TimestampExtractor { >>>>> public long extract(ConsumerRecord<Object, Object> record) { >>>>> if (record.value() instanceof Message) { >>>>> return ((Message) record.value()).ts; >>>>> } else { >>>>> return record.timestamp(); >>>>> } >>>>> } >>>>> } >>>>> >>>>> Here message has a long field ts which stores the timestamp, the >>>>> aggregation does not work. >>>>> Note I have checked and ts has valid timestamp values. >>>>> >>>>> However if I replace it with say WallclockTimestampExtractor >> aggregation >>>> is >>>>> working fine. >>>>> >>>>> I do not understand what could be the issue here. >>>>> >>>>> Also note I am using kafka streams version 0.10.0.1 and I am publishing >>>>> messages via >>>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old >> 0.5.x >>>>> >>>>> Let me know if there is some bug in time stamp extractions. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> >>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Sachin, >>>>>> >>>>>> This is indeed a bit wired, and we'd like to try to re-produce your >>>> issue >>>>>> locally. Do you have a sample input data for us to try out? >>>>>> >>>>>> Guozhang >>>>>> >>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I fixed that sorted set issue but I am facing a weird problem which I >>>> am >>>>>>> not able to replicate. >>>>>>> >>>>>>> Here is the sample problem that I could isolate: >>>>>>> My class is like this: >>>>>>> public static class Message implements Comparable<Message> { >>>>>>> public long ts; >>>>>>> public String message; >>>>>>> public String key; >>>>>>> public Message() {}; >>>>>>> public Message(long ts, String message, String key) { >>>>>>> this.ts = ts; >>>>>>> this.key = key; >>>>>>> this.message = message; >>>>>>> } >>>>>>> public int compareTo(Message paramT) { >>>>>>> long ts1 = paramT.ts; >>>>>>> return ts > ts1 ? 1 : -1; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> pipeline is like this: >>>>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\ >>>>>>> .map(new KeyValueMapper<String, Message, KeyValue<String, >> Message>>() >>>> { >>>>>>> public KeyValue<String, Message> apply(String key, Message >> value) >>>> { >>>>>>> return new KeyValue<String, Message>(value.key, value); >>>>>>> } >>>>>>> }) >>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream") >>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>>>> public SortedSet<Message> apply() { >>>>>>> return new TreeSet<Message>(); >>>>>>> } >>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>> SortedSet<Message> aggregate) { >>>>>>> aggregate.add(value); >>>>>>> return aggregate; >>>>>>> } >>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>>>>>> Serdes.String(), messagesSerde) >>>>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { >>>>>>> public void apply(Windowed<String> key, SortedSet<Message> >>>> messages) >>>>>> { >>>>>>> ... >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> So basically I rekey the original message into another topic and then >>>>>>> aggregate it based on that key. >>>>>>> What I have observed is that when I used windowed aggregation the >>>>>>> aggregator does not use previous aggregated value. >>>>>>> >>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>> SortedSet<Message> aggregate) { >>>>>>> aggregate.add(value); >>>>>>> return aggregate; >>>>>>> } >>>>>>> >>>>>>> So in the above function the aggregate is an empty set of every value >>>>>>> entering into pipeline. When I remove the windowed aggregation, the >>>>>>> aggregate set retains previously aggregated values in the set. >>>>>>> >>>>>>> I am just not able to wrap my head around it. When I ran this type of >>>>>> test >>>>>>> locally on windows it is working fine. However a similar pipeline >> setup >>>>>>> when run against production on linux is behaving strangely and always >>>>>>> getting an empty aggregate set. >>>>>>> Any idea what could be the reason, where should I look at the >> problem. >>>>>> Does >>>>>>> length of key string matters here? I will later try to run the same >>>>>> simple >>>>>>> setup on linux and see what happens. But this is a very strange >>>> behavior. >>>>>>> >>>>>>> Thanks >>>>>>> Sachin >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello Sachin, >>>>>>>> >>>>>>>> In the implementation of SortedSet, if the object's implemented the >>>>>>>> Comparable interface, that compareTo function is applied in " >>>>>>>> aggregate.add(value);", and hence if it returns 0, this element will >>>>>> not >>>>>>> be >>>>>>>> added since it is a Set. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com >>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> What I find is that when I use sorted set as aggregation it fails >> to >>>>>>>>> aggregate the values which have compareTo returning 0. >>>>>>>>> >>>>>>>>> My class is like this: >>>>>>>>> public class Message implements Comparable<Message> { >>>>>>>>> public long ts; >>>>>>>>> public String message; >>>>>>>>> public Message() {}; >>>>>>>>> public Message(long ts, String message) { >>>>>>>>> this.ts = ts; >>>>>>>>> this.message = message; >>>>>>>>> } >>>>>>>>> public int compareTo(Message paramT) { >>>>>>>>> long ts1 = paramT.ts; >>>>>>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> pipeline is like this: >>>>>>>>> builder.stream(Serdes.String(), messageSerde, >> "test-window-stream") >>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>>>>>> public SortedSet<Message> apply() { >>>>>>>>> return new TreeSet<Message>(); >>>>>>>>> } >>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>>>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>>>>>> SortedSet<Message> aggregate) { >>>>>>>>> aggregate.add(value); >>>>>>>>> return aggregate; >>>>>>>>> } >>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>>>>>>>> Serdes.String(), messagesSerde) >>>>>>>>> .foreach(new ForeachAction<Windowed<String>, >> SortedSet<Message>>() { >>>>>>>>> public void apply(Windowed<String> key, SortedSet<Message> >>>>>>> messages) >>>>>>>> { >>>>>>>>> ... >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> >>>>>>>>> So any message published between 10 and 20 seconds gets aggregated >> in >>>>>>> 10 >>>>>>>> - >>>>>>>>> 20 bucket and I print the size of the set. >>>>>>>>> However output I get is following: >>>>>>>>> >>>>>>>>> Published: 14 >>>>>>>>> Aggregated: 10 20 -> 1 >>>>>>>>> >>>>>>>>> Published: 18 >>>>>>>>> Aggregated: 10 20 -> 2 >>>>>>>>> >>>>>>>>> Published: 11 >>>>>>>>> Aggregated: 10 20 -> 3 >>>>>>>>> >>>>>>>>> Published: 17 >>>>>>>>> Aggregated: 10 20 -> 4 >>>>>>>>> >>>>>>>>> Published: 14 >>>>>>>>> Aggregated: 10 20 -> 4 >>>>>>>>> >>>>>>>>> Published: 15 >>>>>>>>> Aggregated: 10 20 -> 5 >>>>>>>>> >>>>>>>>> Published: 12 >>>>>>>>> Aggregated: key2 10 20 -> 6 >>>>>>>>> >>>>>>>>> Published: 12 >>>>>>>>> Aggregated: 10 20 -> 6 >>>>>>>>> >>>>>>>>> So if you see any message that occurs again for same second, where >>>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline. >>>>>>>>> Notice ones published at 14 and 12 seconds. >>>>>>>>> >>>>>>>>> Now I am not sure if problem is with Java ie I should use >> Comparator >>>>>>>>> interface and not Comparable for my Message object. Or the problem >> is >>>>>>>> with >>>>>>>>> Kafka stream or with serializing and de-serializing the set of >>>>>>> messages. >>>>>>>> If >>>>>>>>> I replace Set with List all is working fine. >>>>>>>>> >>>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see >> what >>>>>>> is >>>>>>>>> the best java practice here. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Sachin >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll < >> mich...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal < >> sjmit...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I am using kafka_2.10-0.10.0.1. >>>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. >>>>>>>>>>> If the stream app using timestamp extractor puts the message in >>>>>> one >>>>>>>> or >>>>>>>>>> more >>>>>>>>>>> bucket(s), it will get aggregated in those buckets. >>>>>>>>>>> I assume this statement is correct. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Yes. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Also say when I restart the streams application then bucket >>>>>>>> aggregation >>>>>>>>>>> will resume from last point of halt. >>>>>>>>>>> I hope this is also correct. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Yes. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> What I noticed that once a message is placed in one bucket, that >>>>>>>> bucket >>>>>>>>>> was >>>>>>>>>>> not getting new messages. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This should not happen... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> However when I ran a small test case replicating that, it is >>>>>>> working >>>>>>>>>>> properly. There maybe some issues in application reset. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ...and apparently it works (as expected) in your small test case. >>>>>>>>>> >>>>>>>>>> Do you have any further information that you could share with us >> so >>>>>>> we >>>>>>>>> can >>>>>>>>>> help you better? What's the difference, for example, between your >>>>>>>>> "normal" >>>>>>>>>> use case and the small test case you have been referring to? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Michael >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature