Hi,
What I find is that when I use sorted set as aggregation it fails to
aggregate the values which have compareTo returning 0.

My class is like this:
    public class Message implements Comparable<Message> {
        public long ts;
        public String message;
        public Message() {};
        public Message(long ts, String message) {
            this.ts = ts;
            this.message = message;
        }
        public int compareTo(Message paramT) {
            long ts1 = paramT.ts;
            return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
        }
    }

pipeline is like this:
builder.stream(Serdes.String(), messageSerde, "test-window-stream")
.aggregateByKey(new Initializer<SortedSet<Message>>() {
    public SortedSet<Message> apply() {
        return new TreeSet<Message>();
    }
}, new Aggregator<String, Message, SortedSet<Message>>() {
    public SortedSet<Message> apply(String aggKey, Message value,
SortedSet<Message> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
Serdes.String(), messagesSerde)
.foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
    public void apply(Windowed<String> key, SortedSet<Message> messages) {
        ...
    }
});

So any message published between 10 and 20 seconds gets aggregated in 10 -
20 bucket and I print the size of the set.
However output I get is following:

Published: 14
Aggregated: 10  20 -> 1

Published: 18
Aggregated: 10  20 -> 2

Published: 11
Aggregated: 10  20 -> 3

Published: 17
Aggregated: 10  20 -> 4

Published: 14
Aggregated: 10  20 -> 4

Published: 15
Aggregated: 10  20 -> 5

Published: 12
Aggregated: key2  10  20 -> 6

Published: 12
Aggregated: 10  20 -> 6

So if you see any message that occurs again for same second, where
compareTo returns 0, it fails to get aggregated in the pipeline.
Notice ones published at 14 and 12 seconds.

Now I am not sure if problem is with Java ie I should use Comparator
interface and not Comparable for my Message object. Or the problem is with
Kafka stream or with serializing and de-serializing the set of messages. If
I replace Set with List all is working fine.

Anyway any ideas here would be appreciated, meanwhile let me see what is
the best java practice here.

Thanks
Sachin



On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io> wrote:

> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > I am using kafka_2.10-0.10.0.1.
> > Say I am having a window of 60 minutes advanced by 15 minutes.
> > If the stream app using timestamp extractor puts the message in one or
> more
> > bucket(s), it will get aggregated in those buckets.
> > I assume this statement is correct.
> >
>
> Yes.
>
>
>
> >
> > Also say when I restart the streams application then bucket aggregation
> > will resume from last point of halt.
> > I hope this is also correct.
> >
>
> Yes.
>
>
>
> > What I noticed that once a message is placed in one bucket, that bucket
> was
> > not getting new messages.
> >
>
> This should not happen...
>
>
> > However when I ran a small test case replicating that, it is working
> > properly. There maybe some issues in application reset.
> >
>
> ...and apparently it works (as expected) in your small test case.
>
> Do you have any further information that you could share with us so we can
> help you better?  What's the difference, for example, between your "normal"
> use case and the small test case you have been referring to?
>
>
> -Michael
>

Reply via email to