The extractor is used in


Let us know, if you could resolve the problem or need more help.


On 12/2/16 11:46 AM, Sachin Mittal wrote:
> this is the node js client i am
> using. The version is 0.5x. Can you please tell me what code in streams
> calls the timestamp extractor. I can look there to see if there is any
> issue.
> Again issue happens only when producing the messages using producer that is
> compatible with kafka version 0.8x. I see that this producer does not send
> a record timestamp as this was introduced in version 0.10 only.
> Thanks
> Sachin
> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <> wrote:
>> I am not sure what is happening. That's why it would be good to have a
>> toy example to reproduce the issue.
>> What do you mean by "Kafka node version 0.5"?
>> -Matthias
>> On 12/2/16 11:30 AM, Sachin Mittal wrote:
>>> I can provide with the data but data does not seem to be the issue.
>>> If I submit the same data and use same timestamp extractor  using the
>> java
>>> client with kafka version aggregation works fine.
>>> I find the issue only when submitting the data with kafka node version
>> 0.5.
>>> It looks like the stream does not extract the time correctly in that
>> case.
>>> Thanks
>>> Sachin
>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <>
>> wrote:
>>>> Can you provide example input data (including timetamps) and result.
>>>> What is the expected result (ie, what aggregation do you apply)?
>>>> -Matthias
>>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> After much debugging I found an issue with timestamp extractor.
>>>>> If I use a custom timestamp extractor with following code:
>>>>>     public static class MessageTimestampExtractor implements
>>>>> TimestampExtractor {
>>>>>         public long extract(ConsumerRecord<Object, Object> record) {
>>>>>             if (record.value() instanceof Message) {
>>>>>                 return ((Message) record.value()).ts;
>>>>>             } else {
>>>>>                 return record.timestamp();
>>>>>             }
>>>>>         }
>>>>>     }
>>>>> Here message has a long field ts which stores the timestamp, the
>>>>> aggregation does not work.
>>>>> Note I have checked and ts has valid timestamp values.
>>>>> However if I replace it with say WallclockTimestampExtractor
>> aggregation
>>>> is
>>>>> working fine.
>>>>> I do not understand what could be the issue here.
>>>>> Also note I am using kafka streams version and I am publishing
>>>>> messages via
>>>>> whose version is quite old
>> 0.5.x
>>>>> Let me know if there is some bug in time stamp extractions.
>>>>> Thanks
>>>>> Sachin
>>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <>
>>>> wrote:
>>>>>> Sachin,
>>>>>> This is indeed a bit wired, and we'd like to try to re-produce your
>>>> issue
>>>>>> locally. Do you have a sample input data for us to try out?
>>>>>> Guozhang
>>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <>
>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> I fixed that sorted set issue but I am facing a weird problem which I
>>>> am
>>>>>>> not able to replicate.
>>>>>>> Here is the sample problem that I could isolate:
>>>>>>> My class is like this:
>>>>>>>     public static class Message implements Comparable<Message> {
>>>>>>>         public long ts;
>>>>>>>         public String message;
>>>>>>>         public String key;
>>>>>>>         public Message() {};
>>>>>>>         public Message(long ts, String message, String key) {
>>>>>>>             this.ts = ts;
>>>>>>>             this.key = key;
>>>>>>>             this.message = message;
>>>>>>>         }
>>>>>>>         public int compareTo(Message paramT) {
>>>>>>>             long ts1 = paramT.ts;
>>>>>>>             return ts > ts1 ? 1 : -1;
>>>>>>>         }
>>>>>>>     }
>>>>>>> pipeline is like this:
>>>>>>>, messageSerde, "test-window-stream")\
>>>>>>>  .map(new KeyValueMapper<String, Message, KeyValue<String,
>> Message>>()
>>>> {
>>>>>>>      public KeyValue<String, Message> apply(String key, Message
>> value)
>>>> {
>>>>>>>          return new KeyValue<String, Message>(value.key, value);
>>>>>>>       }
>>>>>>>  })
>>>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>>>     public SortedSet<Message> apply() {
>>>>>>>         return new TreeSet<Message>();
>>>>>>>     }
>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>         aggregate.add(value);
>>>>>>>         return aggregate;
>>>>>>>     }
>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>>>>>> Serdes.String(), messagesSerde)
>>>>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
>>>> messages)
>>>>>> {
>>>>>>>         ...
>>>>>>>     }
>>>>>>> });
>>>>>>> So basically I rekey the original message into another topic and then
>>>>>>> aggregate it based on that key.
>>>>>>> What I have observed is that when I used windowed aggregation the
>>>>>>> aggregator does not use previous aggregated value.
>>>>>>> public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>     aggregate.add(value);
>>>>>>>     return aggregate;
>>>>>>> }
>>>>>>> So in the above function the aggregate is an empty set of every value
>>>>>>> entering into pipeline. When I remove the windowed aggregation, the
>>>>>>> aggregate set retains previously aggregated values in the set.
>>>>>>> I am just not able to wrap my head around it. When I ran this type of
>>>>>> test
>>>>>>> locally on windows it is working fine. However a similar pipeline
>> setup
>>>>>>> when run against production on linux is behaving strangely and always
>>>>>>> getting an empty aggregate set.
>>>>>>> Any idea what could be the reason, where should I look at the
>> problem.
>>>>>> Does
>>>>>>> length of key string matters here? I will later try to run the same
>>>>>> simple
>>>>>>> setup on linux and see what happens. But this is a very strange
>>>> behavior.
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <>
>>>>>>> wrote:
>>>>>>>> Hello Sachin,
>>>>>>>> In the implementation of SortedSet, if the object's implemented the
>>>>>>>> Comparable interface, that compareTo function is applied in "
>>>>>>>> aggregate.add(value);", and hence if it returns 0, this element will
>>>>>> not
>>>>>>> be
>>>>>>>> added since it is a Set.
>>>>>>>> Guozhang
>>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <
>>>>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>> What I find is that when I use sorted set as aggregation it fails
>> to
>>>>>>>>> aggregate the values which have compareTo returning 0.
>>>>>>>>> My class is like this:
>>>>>>>>>     public class Message implements Comparable<Message> {
>>>>>>>>>         public long ts;
>>>>>>>>>         public String message;
>>>>>>>>>         public Message() {};
>>>>>>>>>         public Message(long ts, String message) {
>>>>>>>>>             this.ts = ts;
>>>>>>>>>             this.message = message;
>>>>>>>>>         }
>>>>>>>>>         public int compareTo(Message paramT) {
>>>>>>>>>             long ts1 = paramT.ts;
>>>>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>> pipeline is like this:
>>>>>>>>>, messageSerde,
>> "test-window-stream")
>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>>>>>     public SortedSet<Message> apply() {
>>>>>>>>>         return new TreeSet<Message>();
>>>>>>>>>     }
>>>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>         aggregate.add(value);
>>>>>>>>>         return aggregate;
>>>>>>>>>     }
>>>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>>>>>>>> Serdes.String(), messagesSerde)
>>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
>> SortedSet<Message>>() {
>>>>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
>>>>>>> messages)
>>>>>>>> {
>>>>>>>>>         ...
>>>>>>>>>     }
>>>>>>>>> });
>>>>>>>>> So any message published between 10 and 20 seconds gets aggregated
>> in
>>>>>>> 10
>>>>>>>> -
>>>>>>>>> 20 bucket and I print the size of the set.
>>>>>>>>> However output I get is following:
>>>>>>>>> Published: 14
>>>>>>>>> Aggregated: 10  20 -> 1
>>>>>>>>> Published: 18
>>>>>>>>> Aggregated: 10  20 -> 2
>>>>>>>>> Published: 11
>>>>>>>>> Aggregated: 10  20 -> 3
>>>>>>>>> Published: 17
>>>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>>> Published: 14
>>>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>>> Published: 15
>>>>>>>>> Aggregated: 10  20 -> 5
>>>>>>>>> Published: 12
>>>>>>>>> Aggregated: key2  10  20 -> 6
>>>>>>>>> Published: 12
>>>>>>>>> Aggregated: 10  20 -> 6
>>>>>>>>> So if you see any message that occurs again for same second, where
>>>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
>>>>>>>>> Notice ones published at 14 and 12 seconds.
>>>>>>>>> Now I am not sure if problem is with Java ie I should use
>> Comparator
>>>>>>>>> interface and not Comparable for my Message object. Or the problem
>> is
>>>>>>>> with
>>>>>>>>> Kafka stream or with serializing and de-serializing the set of
>>>>>>> messages.
>>>>>>>> If
>>>>>>>>> I replace Set with List all is working fine.
>>>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see
>> what
>>>>>>> is
>>>>>>>>> the best java practice here.
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
>>>>>>>>> wrote:
>>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
>>>>>>>>> wrote:
>>>>>>>>>>> I am using kafka_2.10-
>>>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
>>>>>>>>>>> If the stream app using timestamp extractor puts the message in
>>>>>> one
>>>>>>>> or
>>>>>>>>>> more
>>>>>>>>>>> bucket(s), it will get aggregated in those buckets.
>>>>>>>>>>> I assume this statement is correct.
>>>>>>>>>> Yes.
>>>>>>>>>>> Also say when I restart the streams application then bucket
>>>>>>>> aggregation
>>>>>>>>>>> will resume from last point of halt.
>>>>>>>>>>> I hope this is also correct.
>>>>>>>>>> Yes.
>>>>>>>>>>> What I noticed that once a message is placed in one bucket, that
>>>>>>>> bucket
>>>>>>>>>> was
>>>>>>>>>>> not getting new messages.
>>>>>>>>>> This should not happen...
>>>>>>>>>>> However when I ran a small test case replicating that, it is
>>>>>>> working
>>>>>>>>>>> properly. There maybe some issues in application reset.
>>>>>>>>>> ...and apparently it works (as expected) in your small test case.
>>>>>>>>>> Do you have any further information that you could share with us
>> so
>>>>>>> we
>>>>>>>>> can
>>>>>>>>>> help you better?  What's the difference, for example, between your
>>>>>>>>> "normal"
>>>>>>>>>> use case and the small test case you have been referring to?
>>>>>>>>>> -Michael
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>> --
>>>>>> -- Guozhang

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to