Hello Sachin,

In the implementation of SortedSet, if the object's implemented the
Comparable interface, that compareTo function is applied in "
aggregate.add(value);", and hence if it returns 0, this element will not be
added since it is a Set.


Guozhang


On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> What I find is that when I use sorted set as aggregation it fails to
> aggregate the values which have compareTo returning 0.
>
> My class is like this:
>     public class Message implements Comparable<Message> {
>         public long ts;
>         public String message;
>         public Message() {};
>         public Message(long ts, String message) {
>             this.ts = ts;
>             this.message = message;
>         }
>         public int compareTo(Message paramT) {
>             long ts1 = paramT.ts;
>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
>         }
>     }
>
> pipeline is like this:
> builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>     public SortedSet<Message> apply() {
>         return new TreeSet<Message>();
>     }
> }, new Aggregator<String, Message, SortedSet<Message>>() {
>     public SortedSet<Message> apply(String aggKey, Message value,
> SortedSet<Message> aggregate) {
>         aggregate.add(value);
>         return aggregate;
>     }
> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> Serdes.String(), messagesSerde)
> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>     public void apply(Windowed<String> key, SortedSet<Message> messages) {
>         ...
>     }
> });
>
> So any message published between 10 and 20 seconds gets aggregated in 10 -
> 20 bucket and I print the size of the set.
> However output I get is following:
>
> Published: 14
> Aggregated: 10  20 -> 1
>
> Published: 18
> Aggregated: 10  20 -> 2
>
> Published: 11
> Aggregated: 10  20 -> 3
>
> Published: 17
> Aggregated: 10  20 -> 4
>
> Published: 14
> Aggregated: 10  20 -> 4
>
> Published: 15
> Aggregated: 10  20 -> 5
>
> Published: 12
> Aggregated: key2  10  20 -> 6
>
> Published: 12
> Aggregated: 10  20 -> 6
>
> So if you see any message that occurs again for same second, where
> compareTo returns 0, it fails to get aggregated in the pipeline.
> Notice ones published at 14 and 12 seconds.
>
> Now I am not sure if problem is with Java ie I should use Comparator
> interface and not Comparable for my Message object. Or the problem is with
> Kafka stream or with serializing and de-serializing the set of messages. If
> I replace Set with List all is working fine.
>
> Anyway any ideas here would be appreciated, meanwhile let me see what is
> the best java practice here.
>
> Thanks
> Sachin
>
>
>
> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > I am using kafka_2.10-0.10.0.1.
> > > Say I am having a window of 60 minutes advanced by 15 minutes.
> > > If the stream app using timestamp extractor puts the message in one or
> > more
> > > bucket(s), it will get aggregated in those buckets.
> > > I assume this statement is correct.
> > >
> >
> > Yes.
> >
> >
> >
> > >
> > > Also say when I restart the streams application then bucket aggregation
> > > will resume from last point of halt.
> > > I hope this is also correct.
> > >
> >
> > Yes.
> >
> >
> >
> > > What I noticed that once a message is placed in one bucket, that bucket
> > was
> > > not getting new messages.
> > >
> >
> > This should not happen...
> >
> >
> > > However when I ran a small test case replicating that, it is working
> > > properly. There maybe some issues in application reset.
> > >
> >
> > ...and apparently it works (as expected) in your small test case.
> >
> > Do you have any further information that you could share with us so we
> can
> > help you better?  What's the difference, for example, between your
> "normal"
> > use case and the small test case you have been referring to?
> >
> >
> > -Michael
> >
>



-- 
-- Guozhang

Reply via email to