Shawn Nguyen created KAFKA-7110:
-----------------------------------
Summary: Windowed changelog keys not deserialized properly by
TimeWindowedSerde
Key: KAFKA-7110
URL: https://issues.apache.org/jira/browse/KAFKA-7110
Project: Kafka
Issue Type: Bug
Reporter: Shawn Nguyen
Fix For: 1.1.0
Currently the TimeWindowedSerde does not deserialize the windowed keys from a
changelog topic properly. There are a few assumptions made in the
TimeWindowedDeserializer that prevents the changelog windowed keys from being
correctly deserialized.
1) In the from method of WindowKeySchema (called in deserialize in
TimeWindowedDeserializer), we extract the window from the binary key, but we
call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for
ChangeLoggingWindowBytesStore will log the windowed key as:
{noformat}
changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp,
maybeUpdateSeqnumForDups()), value);
{noformat}
In toStoreKeyBinary, we store the key in
{noformat}
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length +
TIMESTAMP_SIZE + SEQNUM_SIZE);
{noformat}
with the seqnum (used for de-duping). So the eventual result is that when we
deserialize, we do not assume the windowed changelog key has a seq_num, and the
window extracted will be gibberish values since the bytes extracted won't be
alligned.
The fix here is to introduce a new Serde in WindowSerdes that will handle
explicitly, windowed changelog input topic.
2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to
Long.MAX_VALUE:
{noformat}
// TODO: fix this part as last bits of KAFKA-4468 public
TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner,
Long.MAX_VALUE); }
public TimeWindowedDeserializer(final Deserializer<T> inner, final long
windowSize) { this.inner = inner; this.windowSize = windowSize; }
{noformat}
This will cause the end times to be giberrish when we extract the window since
the windowSize is subtracted from the start time in:
{noformat}
public static <K> Windowed<K> from(final byte[] binaryKey, final long
windowSize, final Deserializer<K> deserializer, final String topic) { final
byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key =
deserializer.deserialize(topic, bytes); final Window window =
extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); }
private static Window extractWindow(final byte[] binaryKey, final long
windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long
start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return
timeWindowForSize(start, windowSize); }
{noformat}
So in the new serde, we will make windowSize a constructor param that can be
supplied.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)