[ 
https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142877#comment-16142877
 ] 

Richard Yu edited comment on KAFKA-4468 at 8/26/17 6:10 PM:
------------------------------------------------------------

When looking through the references for Windowed Deserializer, it appears that 
there was none other than unit tests. 

The other approach is to look through the Window Store classes and look for the 
Window size. 
Inherently, since RocksDBWindowStore is the class that is most commonly used. I 
looked through that particular class
and found that when instantiating window in RocksDBWindowStore, they call the 
class WindowStoreIteratorWrapper. 
In lines 54 - 58 of this particular class, this is what I found:

{code}
@Override
public Windowed<Bytes> peekNextKey() {
        final Bytes next = bytesIterator.peekNextKey();
        final long timestamp = 
WindowStoreUtils.timestampFromBinaryKey(next.get());
        final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get());
        return new Windowed<>(key, 
WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
}
{code}

In this method, {{windowSize}} is known already.  Therefore, this is a 
successful implementation of using a window of limited time. 

However, since we do not know the window size for WindowedDeserializer, it must 
be retrievable in the series of calls in which WindowedDeserializer is 
involved. However, when attempting to look for the window length, it appears 
that no such sequence of calls exists between WindowedDeserializer and Window 
Store.

I am considering some other practical way to retrieve the length of the window.


was (Author: yohan123):
When looking through the references for Windowed Deserializer, it appears that 
there was none other than unit tests. 

The other approach is to look through the Window Store classes and look for the 
Window size. 
Inherently, since RocksDBWindowStore is the class that is most commonly used. I 
looked through that particular class
and found that when instantiating window in RocksDBWindowStore, they call the 
class WindowStoreIteratorWrapper. 
In lines 54 - 58 of this particular class, this is what I found:

{code}
@Override
public Windowed<Bytes> peekNextKey() {
        final Bytes next = bytesIterator.peekNextKey();
        final long timestamp = 
WindowStoreUtils.timestampFromBinaryKey(next.get());
        final Bytes key = WindowStoreUtils.bytesKeyFromBinaryKey(next.get());
        return new Windowed<>(key, 
WindowStoreUtils.timeWindowForSize(timestamp, windowSize));
}
{code}

In this method, {{ windowSize }} is known to the class already.  Therefore, 
this is a successful implementation of using a window of limited time alive, 
since the fixed size is already known. 

In other words, for us to find the fixed length of time, it must be found in 
the series of calls in which WindowedDeserializer is called. However, when 
attempting to look for it in this sequence of calls between Window Store and 
WindowedDeserializer, the window length could not be found.

In other words, there might not be a practical way to retrieve the length of 
the window.

> Correctly calculate the window end timestamp after read from state stores
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4468
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the 
> start timestamp of the window as part of the combo-key as (start-timestamp, 
> key). The reason that we do not add the end-timestamp as well is that we can 
> always calculate it from the start timestamp + window_length, and hence we 
> can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end 
> timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix 
> this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to