[ 
https://issues.apache.org/jira/browse/KAFKA-7110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-7110:
------------------------------
    Component/s: streams

> Windowed changelog keys not deserialized properly by TimeWindowedSerde
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-7110
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7110
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Shawn Nguyen
>            Priority: Major
>
> Currently the TimeWindowedSerde does not deserialize the windowed keys from a 
> changelog topic properly. There are a few assumptions made in the 
> TimeWindowedDeserializer that prevents the changelog windowed keys from being 
> correctly deserialized.
> 1) In the from method of WindowKeySchema (called in deserialize in 
> TimeWindowedDeserializer), we extract the window from the binary key, but we 
> call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for 
> ChangeLoggingWindowBytesStore will log the windowed key as:
>  
> {noformat}
> changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, 
> maybeUpdateSeqnumForDups()), value);
> {noformat}
>  
> In toStoreKeyBinary, we store the key in 
> {noformat}
> final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
> TIMESTAMP_SIZE + SEQNUM_SIZE);
> {noformat}
> with the seqnum (used for de-duping). So the eventual result is that when we 
> deserialize, we do not assume the windowed changelog key has a seq_num, and 
> the window extracted will be gibberish values since the bytes extracted won't 
> be alligned.
> The fix here is to introduce a new Serde in WindowSerdes that will handle 
> explicitly, windowed changelog input topic. 
>  
> 2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to 
> Long.MAX_VALUE:
>  
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468 public 
> TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner, 
> Long.MAX_VALUE); } 
> public TimeWindowedDeserializer(final Deserializer<T> inner, final long 
> windowSize) { this.inner = inner; this.windowSize = windowSize; }
> {noformat}
> This will cause the end times to be giberrish when we extract the window 
> since the windowSize is subtracted from the start time in:
>  
> {noformat}
> public static <K> Windowed<K> from(final byte[] binaryKey, final long 
> windowSize, final Deserializer<K> deserializer, final String topic) { final 
> byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE]; 
> System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key = 
> deserializer.deserialize(topic, bytes); final Window window = 
> extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); } 
> private static Window extractWindow(final byte[] binaryKey, final long 
> windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final 
> long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return 
> timeWindowForSize(start, windowSize); }
> {noformat}
> So in the new serde, we will make windowSize a constructor param that can be 
> supplied.
> I've started a patch, and will prepare a PR for the fix for 1) and 2) above. 
> Let me know if this sounds reasonable. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to