[ https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142877#comment-16142877 ]
Richard Yu edited comment on KAFKA-4468 at 8/26/17 6:03 PM: ------------------------------------------------------------ When looking through the references for Windowed Deserializer, it appears that there was none other than unit tests. The other approach is to look through the Window Store classes and look for the Window size. Inherently, since RocksDBWindowStore is the class that is most commonly used. I looked through that particular class and found that when instantiating window in RocksDBWindowStore, they call the class WindowStoreIteratorWrapper. In lines 54 - 58 of this particular class, this is what I found: {code} @Override public Windowed<Bytes> peekNextKey() { final Bytes next = bytesIterator.peekNextKey(); final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get()); final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get()); return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); } {code} In this method, {{ windowSize }} is known to the class already. Therefore, this is a successful implementation of using a window of limited time alive, since the fixed size is already known. In other words, for us to find the fixed length of time, it must be found in the series of calls in which WindowedDeserializer is called. However, when attempting to look for it in this sequence of calls between Window Store and WindowedDeserializer, the window length could not be found. In other words, there might not be a practical way to retrieve the length of the window. was (Author: yohan123): When looking through the references for Windowed Deserializer, it appears that there was none other than unit tests. The other approach is to look through the Window Store classes and look for the Window size. Inherently, since RocksDBWindowStore is the class that is most commonly used. I looked through that particular class and found that when instantiating RocksDBWindowStore's window. They called the class WindowStoreIteratorWrapper. In lines 54 - 58 of that particular class, this is what I found: {code} @Override public Windowed<Bytes> peekNextKey() { final Bytes next = bytesIterator.peekNextKey(); final long timestamp = WindowStoreUtils.timestampFromBinaryKey(next.get()); final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get()); return new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)); } {code} In this method, {code} windowSize {code} is known to the class already. Therefore, this is a successful implementation of using a window of limited time alive, since the fixed size is already known. In other words, for us to find the fixed length of time that is needed to be alive, it must be found in the series of calls in which WindowedDeserializer is called. However, when attempting to look for find this sequence of calls between Window Store and WindowedDeserializer, the window length could not be found. In other words, there might not be a practical way to retrieve the length of the window. > Correctly calculate the window end timestamp after read from state stores > ------------------------------------------------------------------------- > > Key: KAFKA-4468 > URL: https://issues.apache.org/jira/browse/KAFKA-4468 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Labels: architecture > > When storing the WindowedStore on the persistent KV store, we only use the > start timestamp of the window as part of the combo-key as (start-timestamp, > key). The reason that we do not add the end-timestamp as well is that we can > always calculate it from the start timestamp + window_length, and hence we > can save 8 bytes per key on the persistent KV store. > However, after read it (via {{WindowedDeserializer}}) we do not set its end > timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix > this by calculating its end timestamp as mentioned above. -- This message was sent by Atlassian JIRA (v6.4.14#64029)