[ https://issues.apache.org/jira/browse/KAFKA-7110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ray Chiang updated KAFKA-7110: ------------------------------ Component/s: streams > Windowed changelog keys not deserialized properly by TimeWindowedSerde > ---------------------------------------------------------------------- > > Key: KAFKA-7110 > URL: https://issues.apache.org/jira/browse/KAFKA-7110 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Shawn Nguyen > Priority: Major > > Currently the TimeWindowedSerde does not deserialize the windowed keys from a > changelog topic properly. There are a few assumptions made in the > TimeWindowedDeserializer that prevents the changelog windowed keys from being > correctly deserialized. > 1) In the from method of WindowKeySchema (called in deserialize in > TimeWindowedDeserializer), we extract the window from the binary key, but we > call getLong(binaryKey.length -TIMESTAMP_SIZE). However, the changelog for > ChangeLoggingWindowBytesStore will log the windowed key as: > > {noformat} > changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, > maybeUpdateSeqnumForDups()), value); > {noformat} > > In toStoreKeyBinary, we store the key in > {noformat} > final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + > TIMESTAMP_SIZE + SEQNUM_SIZE); > {noformat} > with the seqnum (used for de-duping). So the eventual result is that when we > deserialize, we do not assume the windowed changelog key has a seq_num, and > the window extracted will be gibberish values since the bytes extracted won't > be alligned. > The fix here is to introduce a new Serde in WindowSerdes that will handle > explicitly, windowed changelog input topic. > > 2) In the constructor of TimeWindowedDeserializer, the windowSize is fixed to > Long.MAX_VALUE: > > {noformat} > // TODO: fix this part as last bits of KAFKA-4468 public > TimeWindowedDeserializer(final Deserializer<T> inner) { this(inner, > Long.MAX_VALUE); } > public TimeWindowedDeserializer(final Deserializer<T> inner, final long > windowSize) { this.inner = inner; this.windowSize = windowSize; } > {noformat} > This will cause the end times to be giberrish when we extract the window > since the windowSize is subtracted from the start time in: > > {noformat} > public static <K> Windowed<K> from(final byte[] binaryKey, final long > windowSize, final Deserializer<K> deserializer, final String topic) { final > byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE]; > System.arraycopy(binaryKey, 0, bytes, 0, bytes.length); final K key = > deserializer.deserialize(topic, bytes); final Window window = > extractWindow(binaryKey, windowSize); return new Windowed<>(key, window); } > private static Window extractWindow(final byte[] binaryKey, final long > windowSize) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final > long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); return > timeWindowForSize(start, windowSize); } > {noformat} > So in the new serde, we will make windowSize a constructor param that can be > supplied. > I've started a patch, and will prepare a PR for the fix for 1) and 2) above. > Let me know if this sounds reasonable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)