Hello Sachin, In the implementation of SortedSet, if the object's implemented the Comparable interface, that compareTo function is applied in " aggregate.add(value);", and hence if it returns 0, this element will not be added since it is a Set.
Guozhang On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > What I find is that when I use sorted set as aggregation it fails to > aggregate the values which have compareTo returning 0. > > My class is like this: > public class Message implements Comparable<Message> { > public long ts; > public String message; > public Message() {}; > public Message(long ts, String message) { > this.ts = ts; > this.message = message; > } > public int compareTo(Message paramT) { > long ts1 = paramT.ts; > return ts == ts1 ? 0 : ts > ts1 ? 1 : -1; > } > } > > pipeline is like this: > builder.stream(Serdes.String(), messageSerde, "test-window-stream") > .aggregateByKey(new Initializer<SortedSet<Message>>() { > public SortedSet<Message> apply() { > return new TreeSet<Message>(); > } > }, new Aggregator<String, Message, SortedSet<Message>>() { > public SortedSet<Message> apply(String aggKey, Message value, > SortedSet<Message> aggregate) { > aggregate.add(value); > return aggregate; > } > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), > Serdes.String(), messagesSerde) > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() { > public void apply(Windowed<String> key, SortedSet<Message> messages) { > ... > } > }); > > So any message published between 10 and 20 seconds gets aggregated in 10 - > 20 bucket and I print the size of the set. > However output I get is following: > > Published: 14 > Aggregated: 10 20 -> 1 > > Published: 18 > Aggregated: 10 20 -> 2 > > Published: 11 > Aggregated: 10 20 -> 3 > > Published: 17 > Aggregated: 10 20 -> 4 > > Published: 14 > Aggregated: 10 20 -> 4 > > Published: 15 > Aggregated: 10 20 -> 5 > > Published: 12 > Aggregated: key2 10 20 -> 6 > > Published: 12 > Aggregated: 10 20 -> 6 > > So if you see any message that occurs again for same second, where > compareTo returns 0, it fails to get aggregated in the pipeline. > Notice ones published at 14 and 12 seconds. > > Now I am not sure if problem is with Java ie I should use Comparator > interface and not Comparable for my Message object. Or the problem is with > Kafka stream or with serializing and de-serializing the set of messages. If > I replace Set with List all is working fine. > > Anyway any ideas here would be appreciated, meanwhile let me see what is > the best java practice here. > > Thanks > Sachin > > > > On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io> > wrote: > > > On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > I am using kafka_2.10-0.10.0.1. > > > Say I am having a window of 60 minutes advanced by 15 minutes. > > > If the stream app using timestamp extractor puts the message in one or > > more > > > bucket(s), it will get aggregated in those buckets. > > > I assume this statement is correct. > > > > > > > Yes. > > > > > > > > > > > > Also say when I restart the streams application then bucket aggregation > > > will resume from last point of halt. > > > I hope this is also correct. > > > > > > > Yes. > > > > > > > > > What I noticed that once a message is placed in one bucket, that bucket > > was > > > not getting new messages. > > > > > > > This should not happen... > > > > > > > However when I ran a small test case replicating that, it is working > > > properly. There maybe some issues in application reset. > > > > > > > ...and apparently it works (as expected) in your small test case. > > > > Do you have any further information that you could share with us so we > can > > help you better? What's the difference, for example, between your > "normal" > > use case and the small test case you have been referring to? > > > > > > -Michael > > > -- -- Guozhang