Can you provide example input data (including timetamps) and result. What is the expected result (ie, what aggregation do you apply)?
-Matthias On 12/2/16 7:43 AM, Sachin Mittal wrote: > Hi, > After much debugging I found an issue with timestamp extractor. > > If I use a custom timestamp extractor with following code: > public static class MessageTimestampExtractor implements > TimestampExtractor { > public long extract(ConsumerRecord<Object, Object> record) { > if (record.value() instanceof Message) { > return ((Message) record.value()).ts; > } else { > return record.timestamp(); > } > } > } > > Here message has a long field ts which stores the timestamp, the > aggregation does not work. > Note I have checked and ts has valid timestamp values. > > However if I replace it with say WallclockTimestampExtractor aggregation is > working fine. > > I do not understand what could be the issue here. > > Also note I am using kafka streams version 0.10.0.1 and I am publishing > messages via > https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x > > Let me know if there is some bug in time stamp extractions. > > Thanks > Sachin > > > > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Sachin, >> >> This is indeed a bit wired, and we'd like to try to re-produce your issue >> locally. Do you have a sample input data for us to try out? >> >> Guozhang >> >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com> >> wrote: >> >>> Hi, >>> I fixed that sorted set issue but I am facing a weird problem which I am >>> not able to replicate. >>> >>> Here is the sample problem that I could isolate: >>> My class is like this: >>> public static class Message implements Comparable<Message> { >>> public long ts; >>> public String message; >>> public String key; >>> public Message() {}; >>> public Message(long ts, String message, String key) { >>> this.ts = ts; >>> this.key = key; >>> this.message = message; >>> } >>> public int compareTo(Message paramT) { >>> long ts1 = paramT.ts; >>> return ts > ts1 ? 1 : -1; >>> } >>> } >>> >>> pipeline is like this: >>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\ >>> .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() { >>> public KeyValue<String, Message> apply(String key, Message value) { >>> return new KeyValue<String, Message>(value.key, value); >>> } >>> }) >>> .through(Serdes.String(), messageSerde, "test-window-key-stream") >>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>> public SortedSet<Message> apply() { >>> return new TreeSet<Message>(); >>> } >>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>> public SortedSet<Message> apply(String aggKey, Message value, >>> SortedSet<Message> aggregate) { >>> aggregate.add(value); >>> return aggregate; >>> } >>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>> Serdes.String(), messagesSerde) >>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { >>> public void apply(Windowed<String> key, SortedSet<Message> messages) >> { >>> ... >>> } >>> }); >>> >>> So basically I rekey the original message into another topic and then >>> aggregate it based on that key. >>> What I have observed is that when I used windowed aggregation the >>> aggregator does not use previous aggregated value. >>> >>> public SortedSet<Message> apply(String aggKey, Message value, >>> SortedSet<Message> aggregate) { >>> aggregate.add(value); >>> return aggregate; >>> } >>> >>> So in the above function the aggregate is an empty set of every value >>> entering into pipeline. When I remove the windowed aggregation, the >>> aggregate set retains previously aggregated values in the set. >>> >>> I am just not able to wrap my head around it. When I ran this type of >> test >>> locally on windows it is working fine. However a similar pipeline setup >>> when run against production on linux is behaving strangely and always >>> getting an empty aggregate set. >>> Any idea what could be the reason, where should I look at the problem. >> Does >>> length of key string matters here? I will later try to run the same >> simple >>> setup on linux and see what happens. But this is a very strange behavior. >>> >>> Thanks >>> Sachin >>> >>> >>> >>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>>> Hello Sachin, >>>> >>>> In the implementation of SortedSet, if the object's implemented the >>>> Comparable interface, that compareTo function is applied in " >>>> aggregate.add(value);", and hence if it returns 0, this element will >> not >>> be >>>> added since it is a Set. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> What I find is that when I use sorted set as aggregation it fails to >>>>> aggregate the values which have compareTo returning 0. >>>>> >>>>> My class is like this: >>>>> public class Message implements Comparable<Message> { >>>>> public long ts; >>>>> public String message; >>>>> public Message() {}; >>>>> public Message(long ts, String message) { >>>>> this.ts = ts; >>>>> this.message = message; >>>>> } >>>>> public int compareTo(Message paramT) { >>>>> long ts1 = paramT.ts; >>>>> return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; >>>>> } >>>>> } >>>>> >>>>> pipeline is like this: >>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream") >>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() { >>>>> public SortedSet<Message> apply() { >>>>> return new TreeSet<Message>(); >>>>> } >>>>> }, new Aggregator<String, Message, SortedSet<Message>>() { >>>>> public SortedSet<Message> apply(String aggKey, Message value, >>>>> SortedSet<Message> aggregate) { >>>>> aggregate.add(value); >>>>> return aggregate; >>>>> } >>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), >>>>> Serdes.String(), messagesSerde) >>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { >>>>> public void apply(Windowed<String> key, SortedSet<Message> >>> messages) >>>> { >>>>> ... >>>>> } >>>>> }); >>>>> >>>>> So any message published between 10 and 20 seconds gets aggregated in >>> 10 >>>> - >>>>> 20 bucket and I print the size of the set. >>>>> However output I get is following: >>>>> >>>>> Published: 14 >>>>> Aggregated: 10 20 -> 1 >>>>> >>>>> Published: 18 >>>>> Aggregated: 10 20 -> 2 >>>>> >>>>> Published: 11 >>>>> Aggregated: 10 20 -> 3 >>>>> >>>>> Published: 17 >>>>> Aggregated: 10 20 -> 4 >>>>> >>>>> Published: 14 >>>>> Aggregated: 10 20 -> 4 >>>>> >>>>> Published: 15 >>>>> Aggregated: 10 20 -> 5 >>>>> >>>>> Published: 12 >>>>> Aggregated: key2 10 20 -> 6 >>>>> >>>>> Published: 12 >>>>> Aggregated: 10 20 -> 6 >>>>> >>>>> So if you see any message that occurs again for same second, where >>>>> compareTo returns 0, it fails to get aggregated in the pipeline. >>>>> Notice ones published at 14 and 12 seconds. >>>>> >>>>> Now I am not sure if problem is with Java ie I should use Comparator >>>>> interface and not Comparable for my Message object. Or the problem is >>>> with >>>>> Kafka stream or with serializing and de-serializing the set of >>> messages. >>>> If >>>>> I replace Set with List all is working fine. >>>>> >>>>> Anyway any ideas here would be appreciated, meanwhile let me see what >>> is >>>>> the best java practice here. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> >>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io> >>>>> wrote: >>>>> >>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com >>> >>>>> wrote: >>>>>> >>>>>>> I am using kafka_2.10-0.10.0.1. >>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes. >>>>>>> If the stream app using timestamp extractor puts the message in >> one >>>> or >>>>>> more >>>>>>> bucket(s), it will get aggregated in those buckets. >>>>>>> I assume this statement is correct. >>>>>>> >>>>>> >>>>>> Yes. >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> Also say when I restart the streams application then bucket >>>> aggregation >>>>>>> will resume from last point of halt. >>>>>>> I hope this is also correct. >>>>>>> >>>>>> >>>>>> Yes. >>>>>> >>>>>> >>>>>> >>>>>>> What I noticed that once a message is placed in one bucket, that >>>> bucket >>>>>> was >>>>>>> not getting new messages. >>>>>>> >>>>>> >>>>>> This should not happen... >>>>>> >>>>>> >>>>>>> However when I ran a small test case replicating that, it is >>> working >>>>>>> properly. There maybe some issues in application reset. >>>>>>> >>>>>> >>>>>> ...and apparently it works (as expected) in your small test case. >>>>>> >>>>>> Do you have any further information that you could share with us so >>> we >>>>> can >>>>>> help you better? What's the difference, for example, between your >>>>> "normal" >>>>>> use case and the small test case you have been referring to? >>>>>> >>>>>> >>>>>> -Michael >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature