sanghyeok An created KAFKA-20311:
------------------------------------
Summary: Cached metered key-value stores can deserialize old
values with the wrong headers when sendOldValues=true
Key: KAFKA-20311
URL: https://issues.apache.org/jira/browse/KAFKA-20311
Project: Kafka
Issue Type: Bug
Reporter: sanghyeok An
Assignee: sanghyeok An
In the cached flush path, MeteredKeyValueStore deserializes both newValue and
oldValue using the single headers set carried by the flush record.
However, the flush record is built from:
* newValue bytes from the cache
* oldValue bytes read from the underlying store
* one Record.headers() from the cached entry being flushed
*
This means oldValue bytes and newValue bytes are not guaranteed to have been
serialized with the same headers.
When sendOldValues=true and a header-aware serde is used, oldValue may
therefore be deserialized with headers that differ from the headers originally
used to serialize it.
As a result:
* oldValue may be reconstructed incorrectly, or
* deserialization may fail with an exception, depending on the serde
implementation.
{*}Affected Stores{*}{*}{*}
* MeteredWindowStore
* MeteredKeyValueStore
* MeteredSessionStore
{*}Why this happens{*}{*}{*}
CachingKeyValueStore creates the flush record by:
# taking newValue bytes from the cache
# reading oldValue bytes from underlying store.
# attaching entry.entry().context().headers() to the flush Record
MeteredKeyValueStore.setFlushListener then deserializes both change.newValue
and change.oldValue using *record.headers().*
{*}Condition{*}{*}{*}
This issue requires:
* caching enabled
* sendOldValues=true
* a serde whose deserialize(topic, headers, bytes) behavior depends on headers
{*}Expected behavior{*}{*}{*}
The cached flush path should not deserialize oldValue with flush-record headers
unless those headers are guaranteed to match the headers originally used to
serialize oldValue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)