Can you provide example input data (including timetamps) and result.
What is the expected result (ie, what aggregation do you apply)?


-Matthias

On 12/2/16 7:43 AM, Sachin Mittal wrote:
> Hi,
> After much debugging I found an issue with timestamp extractor.
> 
> If I use a custom timestamp extractor with following code:
>     public static class MessageTimestampExtractor implements
> TimestampExtractor {
>         public long extract(ConsumerRecord<Object, Object> record) {
>             if (record.value() instanceof Message) {
>                 return ((Message) record.value()).ts;
>             } else {
>                 return record.timestamp();
>             }
>         }
>     }
> 
> Here message has a long field ts which stores the timestamp, the
> aggregation does not work.
> Note I have checked and ts has valid timestamp values.
> 
> However if I replace it with say WallclockTimestampExtractor aggregation is
> working fine.
> 
> I do not understand what could be the issue here.
> 
> Also note I am using kafka streams version 0.10.0.1 and I am publishing
> messages via
> https://github.com/SOHU-Co/kafka-node/ whose version is quite old 0.5.x
> 
> Let me know if there is some bug in time stamp extractions.
> 
> Thanks
> Sachin
> 
> 
> 
> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Sachin,
>>
>> This is indeed a bit wired, and we'd like to try to re-produce your issue
>> locally. Do you have a sample input data for us to try out?
>>
>> Guozhang
>>
>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <sjmit...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I fixed that sorted set issue but I am facing a weird problem which I am
>>> not able to replicate.
>>>
>>> Here is the sample problem that I could isolate:
>>> My class is like this:
>>>     public static class Message implements Comparable<Message> {
>>>         public long ts;
>>>         public String message;
>>>         public String key;
>>>         public Message() {};
>>>         public Message(long ts, String message, String key) {
>>>             this.ts = ts;
>>>             this.key = key;
>>>             this.message = message;
>>>         }
>>>         public int compareTo(Message paramT) {
>>>             long ts1 = paramT.ts;
>>>             return ts > ts1 ? 1 : -1;
>>>         }
>>>     }
>>>
>>> pipeline is like this:
>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")\
>>>  .map(new KeyValueMapper<String, Message, KeyValue<String, Message>>() {
>>>      public KeyValue<String, Message> apply(String key, Message value) {
>>>          return new KeyValue<String, Message>(value.key, value);
>>>       }
>>>  })
>>> .through(Serdes.String(), messageSerde, "test-window-key-stream")
>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>     public SortedSet<Message> apply() {
>>>         return new TreeSet<Message>();
>>>     }
>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>     public SortedSet<Message> apply(String aggKey, Message value,
>>> SortedSet<Message> aggregate) {
>>>         aggregate.add(value);
>>>         return aggregate;
>>>     }
>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>> Serdes.String(), messagesSerde)
>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>>>     public void apply(Windowed<String> key, SortedSet<Message> messages)
>> {
>>>         ...
>>>     }
>>> });
>>>
>>> So basically I rekey the original message into another topic and then
>>> aggregate it based on that key.
>>> What I have observed is that when I used windowed aggregation the
>>> aggregator does not use previous aggregated value.
>>>
>>> public SortedSet<Message> apply(String aggKey, Message value,
>>> SortedSet<Message> aggregate) {
>>>     aggregate.add(value);
>>>     return aggregate;
>>> }
>>>
>>> So in the above function the aggregate is an empty set of every value
>>> entering into pipeline. When I remove the windowed aggregation, the
>>> aggregate set retains previously aggregated values in the set.
>>>
>>> I am just not able to wrap my head around it. When I ran this type of
>> test
>>> locally on windows it is working fine. However a similar pipeline setup
>>> when run against production on linux is behaving strangely and always
>>> getting an empty aggregate set.
>>> Any idea what could be the reason, where should I look at the problem.
>> Does
>>> length of key string matters here? I will later try to run the same
>> simple
>>> setup on linux and see what happens. But this is a very strange behavior.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>>> Hello Sachin,
>>>>
>>>> In the implementation of SortedSet, if the object's implemented the
>>>> Comparable interface, that compareTo function is applied in "
>>>> aggregate.add(value);", and hence if it returns 0, this element will
>> not
>>> be
>>>> added since it is a Set.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <sjmit...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> What I find is that when I use sorted set as aggregation it fails to
>>>>> aggregate the values which have compareTo returning 0.
>>>>>
>>>>> My class is like this:
>>>>>     public class Message implements Comparable<Message> {
>>>>>         public long ts;
>>>>>         public String message;
>>>>>         public Message() {};
>>>>>         public Message(long ts, String message) {
>>>>>             this.ts = ts;
>>>>>             this.message = message;
>>>>>         }
>>>>>         public int compareTo(Message paramT) {
>>>>>             long ts1 = paramT.ts;
>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 : -1;
>>>>>         }
>>>>>     }
>>>>>
>>>>> pipeline is like this:
>>>>> builder.stream(Serdes.String(), messageSerde, "test-window-stream")
>>>>> .aggregateByKey(new Initializer<SortedSet<Message>>() {
>>>>>     public SortedSet<Message> apply() {
>>>>>         return new TreeSet<Message>();
>>>>>     }
>>>>> }, new Aggregator<String, Message, SortedSet<Message>>() {
>>>>>     public SortedSet<Message> apply(String aggKey, Message value,
>>>>> SortedSet<Message> aggregate) {
>>>>>         aggregate.add(value);
>>>>>         return aggregate;
>>>>>     }
>>>>> }, TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L),
>>>>> Serdes.String(), messagesSerde)
>>>>> .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>>>>>     public void apply(Windowed<String> key, SortedSet<Message>
>>> messages)
>>>> {
>>>>>         ...
>>>>>     }
>>>>> });
>>>>>
>>>>> So any message published between 10 and 20 seconds gets aggregated in
>>> 10
>>>> -
>>>>> 20 bucket and I print the size of the set.
>>>>> However output I get is following:
>>>>>
>>>>> Published: 14
>>>>> Aggregated: 10  20 -> 1
>>>>>
>>>>> Published: 18
>>>>> Aggregated: 10  20 -> 2
>>>>>
>>>>> Published: 11
>>>>> Aggregated: 10  20 -> 3
>>>>>
>>>>> Published: 17
>>>>> Aggregated: 10  20 -> 4
>>>>>
>>>>> Published: 14
>>>>> Aggregated: 10  20 -> 4
>>>>>
>>>>> Published: 15
>>>>> Aggregated: 10  20 -> 5
>>>>>
>>>>> Published: 12
>>>>> Aggregated: key2  10  20 -> 6
>>>>>
>>>>> Published: 12
>>>>> Aggregated: 10  20 -> 6
>>>>>
>>>>> So if you see any message that occurs again for same second, where
>>>>> compareTo returns 0, it fails to get aggregated in the pipeline.
>>>>> Notice ones published at 14 and 12 seconds.
>>>>>
>>>>> Now I am not sure if problem is with Java ie I should use Comparator
>>>>> interface and not Comparable for my Message object. Or the problem is
>>>> with
>>>>> Kafka stream or with serializing and de-serializing the set of
>>> messages.
>>>> If
>>>>> I replace Set with List all is working fine.
>>>>>
>>>>> Anyway any ideas here would be appreciated, meanwhile let me see what
>>> is
>>>>> the best java practice here.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <sjmit...@gmail.com
>>>
>>>>> wrote:
>>>>>>
>>>>>>> I am using kafka_2.10-0.10.0.1.
>>>>>>> Say I am having a window of 60 minutes advanced by 15 minutes.
>>>>>>> If the stream app using timestamp extractor puts the message in
>> one
>>>> or
>>>>>> more
>>>>>>> bucket(s), it will get aggregated in those buckets.
>>>>>>> I assume this statement is correct.
>>>>>>>
>>>>>>
>>>>>> Yes.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Also say when I restart the streams application then bucket
>>>> aggregation
>>>>>>> will resume from last point of halt.
>>>>>>> I hope this is also correct.
>>>>>>>
>>>>>>
>>>>>> Yes.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> What I noticed that once a message is placed in one bucket, that
>>>> bucket
>>>>>> was
>>>>>>> not getting new messages.
>>>>>>>
>>>>>>
>>>>>> This should not happen...
>>>>>>
>>>>>>
>>>>>>> However when I ran a small test case replicating that, it is
>>> working
>>>>>>> properly. There maybe some issues in application reset.
>>>>>>>
>>>>>>
>>>>>> ...and apparently it works (as expected) in your small test case.
>>>>>>
>>>>>> Do you have any further information that you could share with us so
>>> we
>>>>> can
>>>>>> help you better?  What's the difference, for example, between your
>>>>> "normal"
>>>>>> use case and the small test case you have been referring to?
>>>>>>
>>>>>>
>>>>>> -Michael
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to