Hi,
I fixed that sorted set issue but I am facing a weird problem which I am
not able to replicate.

Here is the sample problem that I could isolate:
My class is like this:
    public static class Message implements Comparable<Message> {
        public long ts;
        public String message;
        public String key;
        public Message() {};
        public Message(long ts, String message, String key) {
            this.ts = ts;
            this.key = key;
            this.message = message;
        }
        public int compareTo(Message paramT) {
            long ts1 = paramT.ts;
            return ts > ts1 ? 1 : -1;
        }
    }

pipeline is like this:
builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
 .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() {
     public KeyValue<String, Message> apply(String key, Message value) {
         return new KeyValue<String, Message>(value.key, value);
      }
 })
.through(Serdes.String(), messageSerde, "test-window-key-stream")
.aggregateByKey(new Initializer<SortedSet<Message>>() {
    public SortedSet<Message> apply() {
        return new TreeSet<Message>();
    }
}, new Aggregator<String, Message, SortedSet<Message>>() {
    public SortedSet<Message> apply(String aggKey, Message value,
SortedSet<Message> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
Serdes.String(), messagesSerde)
.foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
    public void apply(Windowed<String> key, SortedSet<Message> messages) {
        ...
    }
});

So basically I rekey the original message into another topic and then
aggregate it based on that key.
What I have observed is that when I used windowed aggregation the
aggregator does not use previous aggregated value.

public SortedSet<Message> apply(String aggKey, Message value,
SortedSet<Message> aggregate) {
    aggregate.add(value);
    return aggregate;
}

So in the above function the aggregate is an empty set of every value
entering into pipeline. When I remove the windowed aggregation, the
aggregate set retains previously aggregated values in the set.

I am just not able to wrap my head around it. When I ran this type of test
locally on windows it is working fine. However a similar pipeline setup
when run against production on linux is behaving strangely and always
getting an empty aggregate set.
Any idea what could be the reason, where should I look at the problem. Does
length of key string matters here? I will later try to run the same simple
setup on linux and see what happens. But this is a very strange behavior.

Thanks
Sachin



On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Sachin,
>
> In the implementation of SortedSet, if the object's implemented the
> Comparable interface, that compareTo function is applied in "
> aggregate.add(value);", and hence if it returns 0, this element will not be
> added since it is a Set.
>
>
> Guozhang
>
>
> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
>
> > Hi,
> > What I find is that when I use sorted set as aggregation it fails to
> > aggregate the values which have compareTo returning 0.
> >
> > My class is like this:
> >     public class Message implements Comparable<Message> {
> >         public long ts;
> >         public String message;
> >         public Message() {};
> >         public Message(long ts, String message) {
> >             this.ts = ts;
> >             this.message = message;
> >         }
> >         public int compareTo(Message paramT) {
> >             long ts1 = paramT.ts;
> >             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> >         }
> >     }
> >
> > pipeline is like this:
> > builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> > .aggregateByKey(new Initializer<SortedSet<Message>>() {
> >     public SortedSet<Message> apply() {
> >         return new TreeSet<Message>();
> >     }
> > }, new Aggregator<String, Message, SortedSet<Message>>() {
> >     public SortedSet<Message> apply(String aggKey, Message value,
> > SortedSet<Message> aggregate) {
> >         aggregate.add(value);
> >         return aggregate;
> >     }
> > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > Serdes.String(), messagesSerde)
> > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
> >     public void apply(Windowed<String> key, SortedSet<Message> messages)
> {
> >         ...
> >     }
> > });
> >
> > So any message published between 10 and 20 seconds gets aggregated in 10
> -
> > 20 bucket and I print the size of the set.
> > However output I get is following:
> >
> > Published: 14
> > Aggregated: 10  20 -> 1
> >
> > Published: 18
> > Aggregated: 10  20 -> 2
> >
> > Published: 11
> > Aggregated: 10  20 -> 3
> >
> > Published: 17
> > Aggregated: 10  20 -> 4
> >
> > Published: 14
> > Aggregated: 10  20 -> 4
> >
> > Published: 15
> > Aggregated: 10  20 -> 5
> >
> > Published: 12
> > Aggregated: key2  10  20 -> 6
> >
> > Published: 12
> > Aggregated: 10  20 -> 6
> >
> > So if you see any message that occurs again for same second, where
> > compareTo returns 0, it fails to get aggregated in the pipeline.
> > Notice ones published at 14 and 12 seconds.
> >
> > Now I am not sure if problem is with Java ie I should use Comparator
> > interface and not Comparable for my Message object. Or the problem is
> with
> > Kafka stream or with serializing and de-serializing the set of messages.
> If
> > I replace Set with List all is working fine.
> >
> > Anyway any ideas here would be appreciated, meanwhile let me see what is
> > the best java practice here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > >
> > > > I am using kafka_2.10-0.10.0.1.
> > > > Say I am having a window of 60 minutes advanced by 15 minutes.
> > > > If the stream app using timestamp extractor puts the message in one
> or
> > > more
> > > > bucket(s), it will get aggregated in those buckets.
> > > > I assume this statement is correct.
> > > >
> > >
> > > Yes.
> > >
> > >
> > >
> > > >
> > > > Also say when I restart the streams application then bucket
> aggregation
> > > > will resume from last point of halt.
> > > > I hope this is also correct.
> > > >
> > >
> > > Yes.
> > >
> > >
> > >
> > > > What I noticed that once a message is placed in one bucket, that
> bucket
> > > was
> > > > not getting new messages.
> > > >
> > >
> > > This should not happen...
> > >
> > >
> > > > However when I ran a small test case replicating that, it is working
> > > > properly. There maybe some issues in application reset.
> > > >
> > >
> > > ...and apparently it works (as expected) in your small test case.
> > >
> > > Do you have any further information that you could share with us so we
> > can
> > > help you better?  What's the difference, for example, between your
> > "normal"
> > > use case and the small test case you have been referring to?
> > >
> > >
> > > -Michael
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to