Just increase the retention time so the window is not dropped and can
accept later arriving data.

About your example: retention time specified via until() is a minimum
retention time! It can happen, that a window is kept longer.


-Matthias

On 12/12/16 11:49 PM, Sachin Mittal wrote:
> Hi,
> Well it does help in case you mentioned, but in the case when on 2017 Dec
> 12 12:01 AM if we receive a message stamped 2017 Dec 11 11:59 PM, it will
> either drop this message or create a fresh older window and aggregate the
> message in that, and then drop the window.
> It is not clear which of the case it will do. But here both cases are
> wrong, as ideally it should have aggregated that message into previous
> aggregation and not start a fresh older aggregation (since on Dec 12 12:00
> AM, we drop older windows and create fresh ones.)
> 
> Could you please explain this case.
> 
> I am trying to reproduce this scenario and have written a small java
> program which runs against latest kafka source. Build against trunk git
> commit of 01d58ad8e039181ade742cf896a08199e3cb7483
> 
> Here I am publishing messages with ts
> TS, TS + 5,  TS + 1, TS + 6, TS + 2, TS + 7, TS + 3, TS + 8, TS + 4, + TS +
> 9, TS + 5 ...
> I hope you get an idea where TS is generally increasing but a next TS can
> have value less than previous one.
> 
> My window is
> TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 * 1000L)
> ie 1 min rolling by 30 seconds and until 2 minutes when we discard the old
> and create new one.
> 
> What I observe is that it always aggregate the result in first bucket it
> creates even after until timestamp is elapsed. So kind of confused here.
> 
> See if you can give me some insight into rolling window. Here is the code
> attached.
> 
> 
> Thanks
> Sachin
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> 
> import java.io.ByteArrayOutputStream;
> import java.util.Date;
> import java.util.Map;
> import java.util.Properties;
> import java.util.SortedSet;
> import java.util.TreeSet;
> 
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.clients.consumer.ConsumerRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.errors.SerializationException;
> import org.apache.kafka.common.serialization.Deserializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.Serializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Aggregator;
> import org.apache.kafka.streams.kstream.ForeachAction;
> import org.apache.kafka.streams.kstream.Initializer;
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.TimeWindows;
> import org.apache.kafka.streams.kstream.Windowed;
> import org.apache.kafka.streams.processor.TimestampExtractor;
> 
> import com.fasterxml.jackson.core.type.TypeReference;
> import com.fasterxml.jackson.databind.ObjectMapper;
> 
> public class TestKafkaWindowStream {
> 
>     public static void main(String[] args) {
>         //start the producer
>         Producer producerThread = new Producer();
>         producerThread.start();
>         //aggregate the messages via stream
>         final Serde<Message> messageSerde = Serdes.serdeFrom(new
> MessageSerializer(), new MessageDeserializer());
>         final Serde<SortedSet<Message>> messagesSerde =
> Serdes.serdeFrom(new Serializer<SortedSet<Message>>() {
>             private ObjectMapper objectMapper = new ObjectMapper();
>             public void close() {}
>             public void configure(Map<String, ?> paramMap, boolean
> paramBoolean) {}
>             public byte[] serialize(String paramString, SortedSet<Message>
> messages) {
>                 if (messages == null) {
>                     return null;
>                 }
>                 try {
>                     ByteArrayOutputStream out = new ByteArrayOutputStream();
>                     objectMapper.writeValue(out, messages);
>                     return out.toByteArray();
>                 } catch (Exception e) {
>                     throw new SerializationException("Error serializing
> JSON message", e);
>                 }
>             }
>         }, new Deserializer<SortedSet<Message>>() {
>             private ObjectMapper objectMapper = new ObjectMapper();
>             public void close() {}
>             public void configure(Map<String, ?> paramMap, boolean
> paramBoolean) {}
>             public SortedSet<Message> deserialize(String paramString,
> byte[] paramArrayOfByte) {
>                 if (paramArrayOfByte == null) {
>                     return null;
>                 }
>                 SortedSet<Message> data = null;
>                 try {
>                     data = objectMapper.readValue(paramArrayOfByte, new
> TypeReference<TreeSet<Message>>() {});
>                 } catch (Exception e) {
>                     throw new SerializationException("Error deserializing
> JSON message", e);
>                 }
>                 return data;
>             }
>         });
>         //build the stream
>         KStreamBuilder builder = new KStreamBuilder();
>         builder.stream(Serdes.String(), messageSerde, "test-window-stream")
>         .groupByKey()
>         .aggregate(new Initializer<SortedSet<Message>>() {
>             public SortedSet<Message> apply() {
>                 return new TreeSet<Message>();
>             }
>         }, new Aggregator<String, Message, SortedSet<Message>>() {
>             public SortedSet<Message> apply(String aggKey, Message value,
> SortedSet<Message> aggregate) {
>                 aggregate.add(value);
>                 return aggregate;
>             }
>         }, TimeWindows.of(60 * 1000L).advanceBy(30 * 1000L).until(2 * 60 *
> 1000L), messagesSerde, "stream-table")
>         .foreach(new ForeachAction<Windowed<String>, SortedSet<Message>>() {
>             public void apply(Windowed<String> key, SortedSet<Message>
> messages) {
>                 if("2".equals(key.key())) {
>                     Date start = new Date(key.window().start());
>                     Date end = new Date(key.window().end());
>                     System.out.println("Aggregated: " +
>                             + start.getMinutes() + ":" + start.getSeconds()
> + " " + end.getMinutes() + ":" + end.getSeconds()
>                             + " -> " + messages.size());
>                 }
>             }
>         });
>         //configure and start the stream
>         Properties streamsProps = new Properties();
>         streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>         streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         streamsProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
>         streamsProps.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.Serdes$StringSerde");
>         streamsProps.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> MessageTimestampExtractor.class);
> //        streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
>         KafkaStreams streams = new KafkaStreams(builder, streamsProps);
>         streams.start();
>     }
> 
>     public static class MessageSerializer implements Serializer<Message> {
>         private ObjectMapper objectMapper = new ObjectMapper();
>         public void configure(Map paramMap, boolean paramBoolean) {}
>         public byte[] serialize(String paramString, Message message) {
>             if (message == null) {
>                 return null;
>             }
>             try {
>                 ByteArrayOutputStream out = new ByteArrayOutputStream();
>                 objectMapper.writeValue(out, message);
>                 return out.toByteArray();
>             } catch (Exception e) {
>                 throw new SerializationException("Error serializing JSON
> message", e);
>             }
>         }
>         public void close() {}
>     }
> 
>     public static class MessageDeserializer implements
> Deserializer<Message> {
>         private ObjectMapper objectMapper = new ObjectMapper();
>         public void configure(Map paramMap, boolean paramBoolean) {}
>         public Message deserialize(String paramString, byte[]
> paramArrayOfByte) {
>             if (paramArrayOfByte == null) {
>                 return null;
>             }
>             Message data = null;
>             try {
>                 data = objectMapper.readValue(paramArrayOfByte, new
> TypeReference<Message>() {});
>             } catch (Exception e) {
>                 throw new SerializationException("Error deserializing JSON
> message", e);
>             }
>             return data;
>         }
>         public void close() {}
>     }
> 
>     public static class MessageTimestampExtractor implements
> TimestampExtractor {
>         public long extract(ConsumerRecord<Object, Object> record) {
>             if (record.value() instanceof Message) {
>                 return ((Message) record.value()).ts;
>             } else {
>                 return record.timestamp();
>             }
>         }
>     }
> 
>     public static class Message implements Comparable<Message> {
>         public long ts;
>         public String message;
>         public Message() {};
>         public Message(long ts, String message) {
>             this.message = message;
>         }
>         public int compareTo(Message paramT) {
>             long ts1 = paramT.ts;
>             return ts > ts1 ? 1 : -1;
>         }
>         public String toString() {
>             return "[" + message + "]";
>         }
>     }
> }
> 
> class Producer extends Thread {
>     private KafkaProducer<String, TestKafkaWindowStream.Message> producer;
> 
>     public Producer() {
>         Properties producerProps = new Properties();
>         producerProps.put("bootstrap.servers", "localhost:9092");
>         producerProps.put("client.id", "DemoProducer");
>         producerProps.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>         producerProps.put("value.serializer",
> "TestKafkaWindowStream$MessageSerializer");
>         producer = new KafkaProducer<String,
> TestKafkaWindowStream.Message>(producerProps);
>     }
> 
>     public void run() {
>         int nextKey = 2;
>         int count = 0;
>         long ts = System.currentTimeMillis();
>         while (true) {
>             Date date = new Date(ts);
>             String messageStr = nextKey +  " " + count;
>             TestKafkaWindowStream.Message m = new
> TestKafkaWindowStream.Message(ts, messageStr);
>             try {
>                 producer.send(new ProducerRecord<String,
> TestKafkaWindowStream.Message>("test-window-stream", ""+nextKey, m)).get();
>                 if(2 == nextKey) {
>                     System.out.println("Published: " + date.getMinutes() +
> " " + date.getSeconds());
>                 }
>                 Thread.sleep(1000);
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>             count++;
>             ts = ts + (count % 2 != 0 ? 5 * 1000l :  - 4 * 1000l);
>         }
>     }
> }
> 
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> 
> 
> On Tue, Dec 13, 2016 at 4:53 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hi Sachin,
>>
>> Note that "until" means that the window will be retained for that period of
>> time after the window starting time. So when you set the time to 1 year, if
>> there is a message whose timestamp is 1 year + 1 sec beyond the "current
>> stream time", then yes it will cause the window to be dropped. But in
>> practice, if you are confident that you would not likely receive a message
>> stamped 2017.Dec.12 (from your use case it seems possible that different
>> source's clocks can be shifted by a bit, but not as much as a year right?
>> ), then it is still helps with the problem.
>>
>>
>> Guozhang
>>
>>
>> On Fri, Dec 9, 2016 at 8:57 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>>> Hi,
>>> I think windows retention period does not solves the problem, only delays
>>> it.
>>> Based on what I understand say I set the time to 1 year using until.
>>> Then when I get the message with timestamp 1 year + 1 sec it will delete
>>> the old windows and create new ones from that message.
>>> Now let us say we get next message with timestamp 1 year - 1 sec, based
>> on
>>> what you said, it will ignore this message.
>>>
>>> In my case we get messages from different sources whose clocks are not in
>>> sync. So overall message come with increasing timestamp but for a short
>>> duration there is no order guarantee.
>>>
>>> So I think before deleting the older windows it should retain small
>> portion
>>> of old windows too, so nearby older messages are not dropped.
>>>
>>> I suggest have something like windows.size.advanceBy.until.retain
>>> Retain will retain the periods which fall under retain ms from the upper
>>> bound.
>>>
>>> So window can be defined as
>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>> 1000l).untill(365 * 24 * 3600 * 1000l).retain(900 * 1000l)
>>> So when dropping older windows it will retain the ones fall in last 15
>>> minutes.
>>>
>>>
>>> Please let me know in case I missed something on how and if at all older
>>> messages are dropped.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Dec 10, 2016 at 5:45 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>
>>>> Assuming your windows retention period is the same to the window
>> length,
>>>> then it is true that ZZ will cause the current window to be dropper.
>> And
>>>> then when ZZA is recieved, it will not cause the old windows to be
>>>> re-created but will be ignored since it is considered as "expired".
>>>>
>>>> Note that you can set the window retention period much longer than the
>>>> window length itself, using the "until" API I mentioned above to handle
>>> any
>>>> sudden future records.
>>>>
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Thu, Dec 8, 2016 at 8:19 PM, Sachin Mittal <sjmit...@gmail.com>
>>> wrote:
>>>>
>>>>> Hi,
>>>>> Right now in order to circumvent this problem I am using a timestamp
>>>> whose
>>>>> values increase by few ms as and when I get new records.
>>>>> So lets say I have records in order
>>>>> A -> lower limit TS + 1 sec
>>>>> B -> lower limit TS + 3 sec
>>>>> C -> lower limit TS + 5 sec
>>>>> ..
>>>>> Z -> upper limit TS - 1 sec
>>>>>
>>>>> Now say I get a record ZZ with ts upper limit TS + 1 sec I assume it
>>> will
>>>>> drop the previous windows and create new ones based on this
>> timestamp.
>>>>> Please confirm this understanding.
>>>>>
>>>>> Now lets say I get new record ZZA with timestamp (old) upper limit TS
>>> - 1
>>>>> sec, will this again cause new windows to be dropped and recreate
>> older
>>>>> windows fresh with all the older aggregation done so far lost?
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 9, 2016 at 12:16 AM, Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hello Sachin,
>>>>>>
>>>>>> I am with you that ideally the windowing segmentation
>> implementation
>>>>> should
>>>>>> be totally abstracted from users but today it is a bit confusing to
>>>>>> understand. I have filed JIRA some time ago to improve on this end:
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/KAFKA-3596
>>>>>>
>>>>>> So to your example, if a "far future record" was received whose
>>>> timestamp
>>>>>> is beyond current time + the retention period, it could potentially
>>>> cause
>>>>>> the current window to be dropped.
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Fri, Dec 2, 2016 at 10:07 PM, Sachin Mittal <sjmit...@gmail.com
>>>
>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I think now it makes all the sense. The field I was using for
>>>> timestamp
>>>>>>> extractor contains timestamps which spans for greater than a
>> day's
>>>>>> duration
>>>>>>> and it worked for wall clock because for short duration
>> timestamps
>>>> were
>>>>>> in
>>>>>>> day's range.
>>>>>>>
>>>>>>> I wanted to understand one thing:
>>>>>>> Say I have a timestamp extractor field and as record gets
>> ingested
>>>>> future
>>>>>>> records will have increasing values for the timestamp.
>>>>>>>
>>>>>>> Now lets say default duration is one day. At a future time a
>> record
>>>>> will
>>>>>>> have timestamp which now is greater than the initial day's range.
>>>>>>> What will happen then, it will create a new segment and then
>> create
>>>>>> windows
>>>>>>> in it for the next day's duration?
>>>>>>> What happens if now it gets a record from the previous day, will
>> it
>>>> get
>>>>>>> discarded or will it again have just the single value aggregated
>> in
>>>> it
>>>>>>> (previous values are lost).
>>>>>>> So when new segment is create as I understand does it retain the
>>>> older
>>>>>>> segments data.
>>>>>>>
>>>>>>> This is bit confusing, so would be helpful if you can explain in
>>> bit
>>>>> more
>>>>>>> detail.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Dec 3, 2016 at 5:18 AM, Guozhang Wang <
>> wangg...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Sachin,
>>>>>>>>
>>>>>>>> One thing to note is that the retention of the windowed stores
>>>> works
>>>>> by
>>>>>>>> keeping multiple segments of the stores where each segments
>>> stores
>>>> a
>>>>>> time
>>>>>>>> range which can potentially span multiple windows, if a new
>>> window
>>>>>> needs
>>>>>>> to
>>>>>>>> be created that is further from the oldest segment's time
>> range +
>>>>>>> retention
>>>>>>>> period (from your code it seems you do not override it from
>>>>>>>> TimeWindows.of("stream-table",
>>>>>>>> 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the
>> default
>>>> of
>>>>>> one
>>>>>>>> day is used.
>>>>>>>>
>>>>>>>> So with WallclockTimeExtractor since it is using system time,
>> it
>>>> wont
>>>>>>> give
>>>>>>>> you timestamps that span for more than a day during a short
>>> period
>>>> of
>>>>>>> time,
>>>>>>>> but if your own defined timestamps expand that value, then old
>>>>> segments
>>>>>>>> will be dropped immediately and hence the aggregate values will
>>> be
>>>>>>> returned
>>>>>>>> as a single value.
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The extractor is used in
>>>>>>>>>
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> RecordQueue#addRawRecords()
>>>>>>>>>
>>>>>>>>> Let us know, if you could resolve the problem or need more
>>> help.
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 12/2/16 11:46 AM, Sachin Mittal wrote:
>>>>>>>>>> https://github.com/SOHU-Co/kafka-node/ this is the node js
>>>>> client
>>>>>> i
>>>>>>> am
>>>>>>>>>> using. The version is 0.5x. Can you please tell me what
>> code
>>> in
>>>>>>> streams
>>>>>>>>>> calls the timestamp extractor. I can look there to see if
>>> there
>>>>> is
>>>>>>> any
>>>>>>>>>> issue.
>>>>>>>>>>
>>>>>>>>>> Again issue happens only when producing the messages using
>>>>> producer
>>>>>>>> that
>>>>>>>>> is
>>>>>>>>>> compatible with kafka version 0.8x. I see that this
>> producer
>>>> does
>>>>>> not
>>>>>>>>> send
>>>>>>>>>> a record timestamp as this was introduced in version 0.10
>>> only.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Sachin
>>>>>>>>>>
>>>>>>>>>> On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" <
>>>>> matth...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I am not sure what is happening. That's why it would be
>> good
>>>> to
>>>>>>> have a
>>>>>>>>>>> toy example to reproduce the issue.
>>>>>>>>>>>
>>>>>>>>>>> What do you mean by "Kafka node version 0.5"?
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 12/2/16 11:30 AM, Sachin Mittal wrote:
>>>>>>>>>>>> I can provide with the data but data does not seem to be
>>> the
>>>>>> issue.
>>>>>>>>>>>> If I submit the same data and use same timestamp
>> extractor
>>>>> using
>>>>>>> the
>>>>>>>>>>> java
>>>>>>>>>>>> client with kafka version 0.10.0.1 aggregation works
>> fine.
>>>>>>>>>>>> I find the issue only when submitting the data with kafka
>>>> node
>>>>>>>> version
>>>>>>>>>>> 0.5.
>>>>>>>>>>>> It looks like the stream does not extract the time
>>> correctly
>>>> in
>>>>>>> that
>>>>>>>>>>> case.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Sachin
>>>>>>>>>>>>
>>>>>>>>>>>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" <
>>>>>> matth...@confluent.io
>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Can you provide example input data (including timetamps)
>>> and
>>>>>>> result.
>>>>>>>>>>>>> What is the expected result (ie, what aggregation do you
>>>>> apply)?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 12/2/16 7:43 AM, Sachin Mittal wrote:
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> After much debugging I found an issue with timestamp
>>>>> extractor.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If I use a custom timestamp extractor with following
>>> code:
>>>>>>>>>>>>>>     public static class MessageTimestampExtractor
>>>> implements
>>>>>>>>>>>>>> TimestampExtractor {
>>>>>>>>>>>>>>         public long extract(ConsumerRecord<Object,
>>> Object>
>>>>>>> record)
>>>>>>>> {
>>>>>>>>>>>>>>             if (record.value() instanceof Message) {
>>>>>>>>>>>>>>                 return ((Message) record.value()).ts;
>>>>>>>>>>>>>>             } else {
>>>>>>>>>>>>>>                 return record.timestamp();
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here message has a long field ts which stores the
>>>> timestamp,
>>>>>> the
>>>>>>>>>>>>>> aggregation does not work.
>>>>>>>>>>>>>> Note I have checked and ts has valid timestamp values.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However if I replace it with say
>>>> WallclockTimestampExtractor
>>>>>>>>>>> aggregation
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> working fine.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I do not understand what could be the issue here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also note I am using kafka streams version 0.10.0.1
>> and I
>>>> am
>>>>>>>>> publishing
>>>>>>>>>>>>>> messages via
>>>>>>>>>>>>>> https://github.com/SOHU-Co/kafka-node/ whose version
>> is
>>>>> quite
>>>>>>> old
>>>>>>>>>>> 0.5.x
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know if there is some bug in time stamp
>>> extractions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Sachin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang <
>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sachin,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is indeed a bit wired, and we'd like to try to
>>>>> re-produce
>>>>>>>> your
>>>>>>>>>>>>> issue
>>>>>>>>>>>>>>> locally. Do you have a sample input data for us to try
>>>> out?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal <
>>>>>>>> sjmit...@gmail.com
>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> I fixed that sorted set issue but I am facing a weird
>>>>> problem
>>>>>>>>> which I
>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> not able to replicate.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here is the sample problem that I could isolate:
>>>>>>>>>>>>>>>> My class is like this:
>>>>>>>>>>>>>>>>     public static class Message implements
>>>>>> Comparable<Message>
>>>>>>> {
>>>>>>>>>>>>>>>>         public long ts;
>>>>>>>>>>>>>>>>         public String message;
>>>>>>>>>>>>>>>>         public String key;
>>>>>>>>>>>>>>>>         public Message() {};
>>>>>>>>>>>>>>>>         public Message(long ts, String message,
>> String
>>>>> key) {
>>>>>>>>>>>>>>>>             this.ts = ts;
>>>>>>>>>>>>>>>>             this.key = key;
>>>>>>>>>>>>>>>>             this.message = message;
>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>         public int compareTo(Message paramT) {
>>>>>>>>>>>>>>>>             long ts1 = paramT.ts;
>>>>>>>>>>>>>>>>             return ts > ts1 ? 1 : -1;
>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> pipeline is like this:
>>>>>>>>>>>>>>>> builder.stream(Serdes.String(), messageSerde,
>>>>>>>>> "test-window-stream")\
>>>>>>>>>>>>>>>>  .map(new KeyValueMapper<String, Message,
>>>> KeyValue<String,
>>>>>>>>>>> Message>>()
>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>      public KeyValue<String, Message> apply(String
>> key,
>>>>>> Message
>>>>>>>>>>> value)
>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>          return new KeyValue<String,
>>> Message>(value.key,
>>>>>>> value);
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>  })
>>>>>>>>>>>>>>>> .through(Serdes.String(), messageSerde,
>>>>>>> "test-window-key-stream")
>>>>>>>>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>
>>> ()
>>> {
>>>>>>>>>>>>>>>>     public SortedSet<Message> apply() {
>>>>>>>>>>>>>>>>         return new TreeSet<Message>();
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> }, new Aggregator<String, Message,
>>> SortedSet<Message>>()
>>>> {
>>>>>>>>>>>>>>>>     public SortedSet<Message> apply(String aggKey,
>>>> Message
>>>>>>> value,
>>>>>>>>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>>>>>>>>         aggregate.add(value);
>>>>>>>>>>>>>>>>         return aggregate;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> }, TimeWindows.of("stream-table", 10 *
>>>> 1000L).advanceBy(5 *
>>>>>>>> 1000L),
>>>>>>>>>>>>>>>> Serdes.String(), messagesSerde)
>>>>>>>>>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
>>>>>>>>> SortedSet<Message>>() {
>>>>>>>>>>>>>>>>     public void apply(Windowed<String> key,
>>>>>> SortedSet<Message>
>>>>>>>>>>>>> messages)
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>         ...
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> });
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So basically I rekey the original message into
>> another
>>>>> topic
>>>>>>> and
>>>>>>>>> then
>>>>>>>>>>>>>>>> aggregate it based on that key.
>>>>>>>>>>>>>>>> What I have observed is that when I used windowed
>>>>> aggregation
>>>>>>> the
>>>>>>>>>>>>>>>> aggregator does not use previous aggregated value.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public SortedSet<Message> apply(String aggKey,
>> Message
>>>>> value,
>>>>>>>>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>>>>>>>>     aggregate.add(value);
>>>>>>>>>>>>>>>>     return aggregate;
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So in the above function the aggregate is an empty
>> set
>>> of
>>>>>> every
>>>>>>>>> value
>>>>>>>>>>>>>>>> entering into pipeline. When I remove the windowed
>>>>>> aggregation,
>>>>>>>> the
>>>>>>>>>>>>>>>> aggregate set retains previously aggregated values in
>>> the
>>>>>> set.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am just not able to wrap my head around it. When I
>>> ran
>>>>> this
>>>>>>>> type
>>>>>>>>> of
>>>>>>>>>>>>>>> test
>>>>>>>>>>>>>>>> locally on windows it is working fine. However a
>>> similar
>>>>>>> pipeline
>>>>>>>>>>> setup
>>>>>>>>>>>>>>>> when run against production on linux is behaving
>>>> strangely
>>>>>> and
>>>>>>>>> always
>>>>>>>>>>>>>>>> getting an empty aggregate set.
>>>>>>>>>>>>>>>> Any idea what could be the reason, where should I
>> look
>>> at
>>>>> the
>>>>>>>>>>> problem.
>>>>>>>>>>>>>>> Does
>>>>>>>>>>>>>>>> length of key string matters here? I will later try
>> to
>>>> run
>>>>>> the
>>>>>>>> same
>>>>>>>>>>>>>>> simple
>>>>>>>>>>>>>>>> setup on linux and see what happens. But this is a
>> very
>>>>>> strange
>>>>>>>>>>>>> behavior.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Sachin
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Nov 23, 2016 at 12:04 AM, Guozhang Wang <
>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello Sachin,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In the implementation of SortedSet, if the object's
>>>>>>> implemented
>>>>>>>>> the
>>>>>>>>>>>>>>>>> Comparable interface, that compareTo function is
>>> applied
>>>>> in
>>>>>> "
>>>>>>>>>>>>>>>>> aggregate.add(value);", and hence if it returns 0,
>>> this
>>>>>>> element
>>>>>>>>> will
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> added since it is a Set.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Nov 21, 2016 at 10:06 PM, Sachin Mittal <
>>>>>>>>> sjmit...@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> What I find is that when I use sorted set as
>>>> aggregation
>>>>> it
>>>>>>>> fails
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> aggregate the values which have compareTo returning
>>> 0.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My class is like this:
>>>>>>>>>>>>>>>>>>     public class Message implements
>>>> Comparable<Message> {
>>>>>>>>>>>>>>>>>>         public long ts;
>>>>>>>>>>>>>>>>>>         public String message;
>>>>>>>>>>>>>>>>>>         public Message() {};
>>>>>>>>>>>>>>>>>>         public Message(long ts, String message) {
>>>>>>>>>>>>>>>>>>             this.ts = ts;
>>>>>>>>>>>>>>>>>>             this.message = message;
>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>         public int compareTo(Message paramT) {
>>>>>>>>>>>>>>>>>>             long ts1 = paramT.ts;
>>>>>>>>>>>>>>>>>>             return ts == ts1 ? 0 : ts > ts1 ? 1 :
>> -1;
>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> pipeline is like this:
>>>>>>>>>>>>>>>>>> builder.stream(Serdes.String(), messageSerde,
>>>>>>>>>>> "test-window-stream")
>>>>>>>>>>>>>>>>>> .aggregateByKey(new Initializer<SortedSet<Message>
>>>> ()
>>>> {
>>>>>>>>>>>>>>>>>>     public SortedSet<Message> apply() {
>>>>>>>>>>>>>>>>>>         return new TreeSet<Message>();
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>> }, new Aggregator<String, Message,
>>>> SortedSet<Message>>()
>>>>> {
>>>>>>>>>>>>>>>>>>     public SortedSet<Message> apply(String aggKey,
>>>>> Message
>>>>>>>> value,
>>>>>>>>>>>>>>>>>> SortedSet<Message> aggregate) {
>>>>>>>>>>>>>>>>>>         aggregate.add(value);
>>>>>>>>>>>>>>>>>>         return aggregate;
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>> }, TimeWindows.of("stream-table", 10 *
>>>>> 1000L).advanceBy(5 *
>>>>>>>>> 1000L),
>>>>>>>>>>>>>>>>>> Serdes.String(), messagesSerde)
>>>>>>>>>>>>>>>>>> .foreach(new ForeachAction<Windowed<String>,
>>>>>>>>>>> SortedSet<Message>>() {
>>>>>>>>>>>>>>>>>>     public void apply(Windowed<String> key,
>>>>>>> SortedSet<Message>
>>>>>>>>>>>>>>>> messages)
>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>         ...
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>> });
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So any message published between 10 and 20 seconds
>>> gets
>>>>>>>>> aggregated
>>>>>>>>>>> in
>>>>>>>>>>>>>>>> 10
>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>> 20 bucket and I print the size of the set.
>>>>>>>>>>>>>>>>>> However output I get is following:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 14
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 1
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 18
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 2
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 11
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 3
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 17
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 14
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 4
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 15
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 5
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 12
>>>>>>>>>>>>>>>>>> Aggregated: key2  10  20 -> 6
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Published: 12
>>>>>>>>>>>>>>>>>> Aggregated: 10  20 -> 6
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So if you see any message that occurs again for
>> same
>>>>>> second,
>>>>>>>>> where
>>>>>>>>>>>>>>>>>> compareTo returns 0, it fails to get aggregated in
>>> the
>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>> Notice ones published at 14 and 12 seconds.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Now I am not sure if problem is with Java ie I
>> should
>>>> use
>>>>>>>>>>> Comparator
>>>>>>>>>>>>>>>>>> interface and not Comparable for my Message object.
>>> Or
>>>>> the
>>>>>>>>> problem
>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> Kafka stream or with serializing and de-serializing
>>> the
>>>>> set
>>>>>>> of
>>>>>>>>>>>>>>>> messages.
>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>> I replace Set with List all is working fine.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Anyway any ideas here would be appreciated,
>> meanwhile
>>>> let
>>>>>> me
>>>>>>>> see
>>>>>>>>>>> what
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> the best java practice here.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Sachin
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Nov 21, 2016 at 8:29 PM, Michael Noll <
>>>>>>>>>>> mich...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Nov 21, 2016 at 1:06 PM, Sachin Mittal <
>>>>>>>>>>> sjmit...@gmail.com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am using kafka_2.10-0.10.0.1.
>>>>>>>>>>>>>>>>>>>> Say I am having a window of 60 minutes advanced
>> by
>>> 15
>>>>>>>> minutes.
>>>>>>>>>>>>>>>>>>>> If the stream app using timestamp extractor puts
>>> the
>>>>>>> message
>>>>>>>> in
>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>> bucket(s), it will get aggregated in those
>> buckets.
>>>>>>>>>>>>>>>>>>>> I assume this statement is correct.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Also say when I restart the streams application
>>> then
>>>>>> bucket
>>>>>>>>>>>>>>>>> aggregation
>>>>>>>>>>>>>>>>>>>> will resume from last point of halt.
>>>>>>>>>>>>>>>>>>>> I hope this is also correct.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What I noticed that once a message is placed in
>> one
>>>>>> bucket,
>>>>>>>>> that
>>>>>>>>>>>>>>>>> bucket
>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>> not getting new messages.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This should not happen...
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> However when I ran a small test case replicating
>>>> that,
>>>>> it
>>>>>>> is
>>>>>>>>>>>>>>>> working
>>>>>>>>>>>>>>>>>>>> properly. There maybe some issues in application
>>>> reset.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> ...and apparently it works (as expected) in your
>>> small
>>>>>> test
>>>>>>>>> case.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Do you have any further information that you could
>>>> share
>>>>>>> with
>>>>>>>> us
>>>>>>>>>>> so
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> help you better?  What's the difference, for
>>> example,
>>>>>>> between
>>>>>>>>> your
>>>>>>>>>>>>>>>>>> "normal"
>>>>>>>>>>>>>>>>>>> use case and the small test case you have been
>>>> referring
>>>>>> to?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Michael
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to