I am not sure what is happening. That's why it would be good to have a
toy example to reproduce the issue.

What do you mean by "Kafka node version 0.5"?

-Matthias

On 12/2/16 11:30 AM, Sachin Mittal wrote:
> I can provide with the data but data does not seem to be the issue.
> If I submit the same data and use same timestamp extractor  using the java
> client with kafka version 0.10.0.1 aggregation works fine.
> I find the issue only when submitting the data with kafka node version 0.5.
> It looks like the stream does not extract the time correctly in that case.
> 
> Thanks
> Sachin
> 
> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <matth...@confluent.io> wrote:
> 
>> Can you provide example input data (including timetamps) and result.
>> What is the expected result (ie, what aggregation do you apply)?
>>
>>
>> -Matthias
>>
>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
>>> Hi,
>>> After much debugging I found an issue with timestamp extractor.
>>>
>>> If I use a custom timestamp extractor with following code:
>>>     public static class MessageTimestampExtractor implements
>>> TimestampExtractor {
>>>         public long extract(ConsumerRecord<Object, Object> record) {
>>>             if (record.value() instanceof Message) {
>>>                 return ((Message) record.value()).ts;
>>>             } else {
>>>                 return record.timestamp();
>>>             }
>>>         }
>>>     }
>>>
>>> Here message has a long field ts which stores the timestamp, the
>>> aggregation does not work.
>>> Note I have checked and ts has valid timestamp values.
>>>
>>> However if I replace it with say WallclockTimestampExtractor aggregation
>> is
>>> working fine.
>>>
>>> I do not understand what could be the issue here.
>>>
>>> Also note I am using kafka streams version 0.10.0.1 and I am publishing
>>> messages via
>>> https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
>>>
>>> Let me know if there is some bug in time stamp extractions.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>
>>>> Sachin,
>>>>
>>>> This is indeed a bit wired, and we'd like to try to re-produce your
>> issue
>>>> locally. Do you have a sample input data for us to try out?
>>>>
>>>> Guozhang
>>>>
>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I fixed that sorted set issue but I am facing a weird problem which I
>> am
>>>>> not able to replicate.
>>>>>
>>>>> Here is the sample problem that I could isolate:
>>>>> My class is like this:
>>>>>     public static class Message implements Comparable<Message> {
>>>>>         public long ts;
>>>>>         public String message;
>>>>>         public String key;
>>>>>         public Message() {};
>>>>>         public Message(long ts, String message, String key) {
>>>>>             this.ts = ts;
>>>>>             this.key = key;
>>>>>             this.message = message;
>>>>>         }
>>>>>         public int compareTo(Message paramT) {
>>>>>             long ts1 = paramT.ts;
>>>>>             return ts > ts1 ? 1 : -1;
>>>>>         }
>>>>>     }
>>>>>
>>>>> pipeline is like this:
>>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
>>>>>  .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>()
>> {
>>>>>      public KeyValue<String, Message> apply(String key, Message value)
>> {
>>>>>          return new KeyValue<String, Message>(value.key, value);
>>>>>       }
>>>>>  })
>>>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>     public SortedSet<Message> apply() {
>>>>>         return new TreeSet<Message>();
>>>>>     }
>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
>>>>> SortedSet<Message> aggregate) {
>>>>>         aggregate.add(value);
>>>>>         return aggregate;
>>>>>     }
>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>>>> Serdes.String(), messagesSerde)
>>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
>> messages)
>>>> {
>>>>>         ...
>>>>>     }
>>>>> });
>>>>>
>>>>> So basically I rekey the original message into another topic and then
>>>>> aggregate it based on that key.
>>>>> What I have observed is that when I used windowed aggregation the
>>>>> aggregator does not use previous aggregated value.
>>>>>
>>>>> public SortedSet<Message> apply(String aggKey, Message value,
>>>>> SortedSet<Message> aggregate) {
>>>>>     aggregate.add(value);
>>>>>     return aggregate;
>>>>> }
>>>>>
>>>>> So in the above function the aggregate is an empty set of every value
>>>>> entering into pipeline. When I remove the windowed aggregation, the
>>>>> aggregate set retains previously aggregated values in the set.
>>>>>
>>>>> I am just not able to wrap my head around it. When I ran this type of
>>>> test
>>>>> locally on windows it is working fine. However a similar pipeline setup
>>>>> when run against production on linux is behaving strangely and always
>>>>> getting an empty aggregate set.
>>>>> Any idea what could be the reason, where should I look at the problem.
>>>> Does
>>>>> length of key string matters here? I will later try to run the same
>>>> simple
>>>>> setup on linux and see what happens. But this is a very strange
>> behavior.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Sachin,
>>>>>>
>>>>>> In the implementation of SortedSet, if the object's implemented the
>>>>>> Comparable interface, that compareTo function is applied in "
>>>>>> aggregate.add(value);", and hence if it returns 0, this element will
>>>> not
>>>>> be
>>>>>> added since it is a Set.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> What I find is that when I use sorted set as aggregation it fails to
>>>>>>> aggregate the values which have compareTo returning 0.
>>>>>>>
>>>>>>> My class is like this:
>>>>>>>     public class Message implements Comparable<Message> {
>>>>>>>         public long ts;
>>>>>>>         public String message;
>>>>>>>         public Message() {};
>>>>>>>         public Message(long ts, String message) {
>>>>>>>             this.ts = ts;
>>>>>>>             this.message = message;
>>>>>>>         }
>>>>>>>         public int compareTo(Message paramT) {
>>>>>>>             long ts1 = paramT.ts;
>>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>> pipeline is like this:
>>>>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")
>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>>>     public SortedSet<Message> apply() {
>>>>>>>         return new TreeSet<Message>();
>>>>>>>     }
>>>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>         aggregate.add(value);
>>>>>>>         return aggregate;
>>>>>>>     }
>>>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>>>>>> Serdes.String(), messagesSerde)
>>>>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>>>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
>>>>> messages)
>>>>>> {
>>>>>>>         ...
>>>>>>>     }
>>>>>>> });
>>>>>>>
>>>>>>> So any message published between 10 and 20 seconds gets aggregated in
>>>>> 10
>>>>>> -
>>>>>>> 20 bucket and I print the size of the set.
>>>>>>> However output I get is following:
>>>>>>>
>>>>>>> Published: 14
>>>>>>> Aggregated: 10  20 -> 1
>>>>>>>
>>>>>>> Published: 18
>>>>>>> Aggregated: 10  20 -> 2
>>>>>>>
>>>>>>> Published: 11
>>>>>>> Aggregated: 10  20 -> 3
>>>>>>>
>>>>>>> Published: 17
>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>
>>>>>>> Published: 14
>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>
>>>>>>> Published: 15
>>>>>>> Aggregated: 10  20 -> 5
>>>>>>>
>>>>>>> Published: 12
>>>>>>> Aggregated: key2  10  20 -> 6
>>>>>>>
>>>>>>> Published: 12
>>>>>>> Aggregated: 10  20 -> 6
>>>>>>>
>>>>>>> So if you see any message that occurs again for same second, where
>>>>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
>>>>>>> Notice ones published at 14 and 12 seconds.
>>>>>>>
>>>>>>> Now I am not sure if problem is with Java ie I should use Comparator
>>>>>>> interface and not Comparable for my Message object. Or the problem is
>>>>>> with
>>>>>>> Kafka stream or with serializing and de-serializing the set of
>>>>> messages.
>>>>>> If
>>>>>>> I replace Set with List all is working fine.
>>>>>>>
>>>>>>> Anyway any ideas here would be appreciated, meanwhile let me see what
>>>>> is
>>>>>>> the best java practice here.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am using kafka_2.10-0.10.0.1.
>>>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
>>>>>>>>> If the stream app using timestamp extractor puts the message in
>>>> one
>>>>>> or
>>>>>>>> more
>>>>>>>>> bucket(s), it will get aggregated in those buckets.
>>>>>>>>> I assume this statement is correct.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yes.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Also say when I restart the streams application then bucket
>>>>>> aggregation
>>>>>>>>> will resume from last point of halt.
>>>>>>>>> I hope this is also correct.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yes.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> What I noticed that once a message is placed in one bucket, that
>>>>>> bucket
>>>>>>>> was
>>>>>>>>> not getting new messages.
>>>>>>>>>
>>>>>>>>
>>>>>>>> This should not happen...
>>>>>>>>
>>>>>>>>
>>>>>>>>> However when I ran a small test case replicating that, it is
>>>>> working
>>>>>>>>> properly. There maybe some issues in application reset.
>>>>>>>>>
>>>>>>>>
>>>>>>>> ...and apparently it works (as expected) in your small test case.
>>>>>>>>
>>>>>>>> Do you have any further information that you could share with us so
>>>>> we
>>>>>>> can
>>>>>>>> help you better?  What's the difference, for example, between your
>>>>>>> "normal"
>>>>>>>> use case and the small test case you have been referring to?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Michael
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to