Hi,
After much debugging I found an issue with timestamp extractor.
If I use a custom timestamp extractor with following code:
public static class MessageTimestampExtractor implements
TimestampExtractor {
public long extract(ConsumerRecord<Object, Object> record) {
if (record.value() instanceof Message) {
return ((Message) record.value()).ts;
} else {
return record.timestamp();
}
}
}
Here message has a long field ts which stores the timestamp, the
aggregation does not work.
Note I have checked and ts has valid timestamp values.
However if I replace it with say WallclockTimestampExtractor aggregation is
working fine.
I do not understand what could be the issue here.
Also note I am using kafka streams version 0.10.0.1 and I am publishing
messages via
https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
Let me know if there is some bug in time stamp extractions.
Thanks
Sachin
On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <[email protected]> wrote:
> Sachin,
>
> This is indeed a bit wired, and we'd like to try to re-produce your issue
> locally. Do you have a sample input data for us to try out?
>
> Guozhang
>
> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <[email protected]>
> wrote:
>
> > Hi,
> > I fixed that sorted set issue but I am facing a weird problem which I am
> > not able to replicate.
> >
> > Here is the sample problem that I could isolate:
> > My class is like this:
> > public static class Message implements Comparable<Message> {
> > public long ts;
> > public String message;
> > public String key;
> > public Message() {};
> > public Message(long ts, String message, String key) {
> > this.ts = ts;
> > this.key = key;
> > this.message = message;
> > }
> > public int compareTo(Message paramT) {
> > long ts1 = paramT.ts;
> > return ts > ts1 ? 1 : -1;
> > }
> > }
> >
> > pipeline is like this:
> > builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
> > .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() {
> > public KeyValue<String, Message> apply(String key, Message value) {
> > return new KeyValue<String, Message>(value.key, value);
> > }
> > })
> > .through(Serdes.String(), messageSerde, "test-window-key-stream")
> > .aggregateByKey(new Initializer<SortedSet<Message>>() {
> > public SortedSet<Message> apply() {
> > return new TreeSet<Message>();
> > }
> > }, new Aggregator<String, Message, SortedSet<Message>>() {
> > public SortedSet<Message> apply(String aggKey, Message value,
> > SortedSet<Message> aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > Serdes.String(), messagesSerde)
> > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
> > public void apply(Windowed<String> key, SortedSet<Message> messages)
> {
> > ...
> > }
> > });
> >
> > So basically I rekey the original message into another topic and then
> > aggregate it based on that key.
> > What I have observed is that when I used windowed aggregation the
> > aggregator does not use previous aggregated value.
> >
> > public SortedSet<Message> apply(String aggKey, Message value,
> > SortedSet<Message> aggregate) {
> > aggregate.add(value);
> > return aggregate;
> > }
> >
> > So in the above function the aggregate is an empty set of every value
> > entering into pipeline. When I remove the windowed aggregation, the
> > aggregate set retains previously aggregated values in the set.
> >
> > I am just not able to wrap my head around it. When I ran this type of
> test
> > locally on windows it is working fine. However a similar pipeline setup
> > when run against production on linux is behaving strangely and always
> > getting an empty aggregate set.
> > Any idea what could be the reason, where should I look at the problem.
> Does
> > length of key string matters here? I will later try to run the same
> simple
> > setup on linux and see what happens. But this is a very strange behavior.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <[email protected]>
> > wrote:
> >
> > > Hello Sachin,
> > >
> > > In the implementation of SortedSet, if the object's implemented the
> > > Comparable interface, that compareTo function is applied in "
> > > aggregate.add(value);", and hence if it returns 0, this element will
> not
> > be
> > > added since it is a Set.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <[email protected]>
> > > wrote:
> > >
> > > > Hi,
> > > > What I find is that when I use sorted set as aggregation it fails to
> > > > aggregate the values which have compareTo returning 0.
> > > >
> > > > My class is like this:
> > > > public class Message implements Comparable<Message> {
> > > > public long ts;
> > > > public String message;
> > > > public Message() {};
> > > > public Message(long ts, String message) {
> > > > this.ts = ts;
> > > > this.message = message;
> > > > }
> > > > public int compareTo(Message paramT) {
> > > > long ts1 = paramT.ts;
> > > > return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
> > > > }
> > > > }
> > > >
> > > > pipeline is like this:
> > > > builder.stream(Serdes.String(), messageSerde, "test-window-stream")
> > > > .aggregateByKey(new Initializer<SortedSet<Message>>() {
> > > > public SortedSet<Message> apply() {
> > > > return new TreeSet<Message>();
> > > > }
> > > > }, new Aggregator<String, Message, SortedSet<Message>>() {
> > > > public SortedSet<Message> apply(String aggKey, Message value,
> > > > SortedSet<Message> aggregate) {
> > > > aggregate.add(value);
> > > > return aggregate;
> > > > }
> > > > }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
> > > > Serdes.String(), messagesSerde)
> > > > .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
> > > > public void apply(Windowed<String> key, SortedSet<Message>
> > messages)
> > > {
> > > > ...
> > > > }
> > > > });
> > > >
> > > > So any message published between 10 and 20 seconds gets aggregated in
> > 10
> > > -
> > > > 20 bucket and I print the size of the set.
> > > > However output I get is following:
> > > >
> > > > Published: 14
> > > > Aggregated: 10 20 -> 1
> > > >
> > > > Published: 18
> > > > Aggregated: 10 20 -> 2
> > > >
> > > > Published: 11
> > > > Aggregated: 10 20 -> 3
> > > >
> > > > Published: 17
> > > > Aggregated: 10 20 -> 4
> > > >
> > > > Published: 14
> > > > Aggregated: 10 20 -> 4
> > > >
> > > > Published: 15
> > > > Aggregated: 10 20 -> 5
> > > >
> > > > Published: 12
> > > > Aggregated: key2 10 20 -> 6
> > > >
> > > > Published: 12
> > > > Aggregated: 10 20 -> 6
> > > >
> > > > So if you see any message that occurs again for same second, where
> > > > compareTo returns 0, it fails to get aggregated in the pipeline.
> > > > Notice ones published at 14 and 12 seconds.
> > > >
> > > > Now I am not sure if problem is with Java ie I should use Comparator
> > > > interface and not Comparable for my Message object. Or the problem is
> > > with
> > > > Kafka stream or with serializing and de-serializing the set of
> > messages.
> > > If
> > > > I replace Set with List all is working fine.
> > > >
> > > > Anyway any ideas here would be appreciated, meanwhile let me see what
> > is
> > > > the best java practice here.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > > On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <[email protected]>
> > > > wrote:
> > > >
> > > > > On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > > I am using kafka_2.10-0.10.0.1.
> > > > > > Say I am having a window of 60 minutes advanced by 15 minutes.
> > > > > > If the stream app using timestamp extractor puts the message in
> one
> > > or
> > > > > more
> > > > > > bucket(s), it will get aggregated in those buckets.
> > > > > > I assume this statement is correct.
> > > > > >
> > > > >
> > > > > Yes.
> > > > >
> > > > >
> > > > >
> > > > > >
> > > > > > Also say when I restart the streams application then bucket
> > > aggregation
> > > > > > will resume from last point of halt.
> > > > > > I hope this is also correct.
> > > > > >
> > > > >
> > > > > Yes.
> > > > >
> > > > >
> > > > >
> > > > > > What I noticed that once a message is placed in one bucket, that
> > > bucket
> > > > > was
> > > > > > not getting new messages.
> > > > > >
> > > > >
> > > > > This should not happen...
> > > > >
> > > > >
> > > > > > However when I ran a small test case replicating that, it is
> > working
> > > > > > properly. There maybe some issues in application reset.
> > > > > >
> > > > >
> > > > > ...and apparently it works (as expected) in your small test case.
> > > > >
> > > > > Do you have any further information that you could share with us so
> > we
> > > > can
> > > > > help you better? What's the difference, for example, between your
> > > > "normal"
> > > > > use case and the small test case you have been referring to?
> > > > >
> > > > >
> > > > > -Michael
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>