Actually one more question:

Do we support upgrading to the new header store from both `KeyValueStore` and `TimestampedKeyValueStore` (similar for windowed store)? The KIP is no explicit about it.

Not sure if it would be complex to support both upgrades -- it does sound useful. Of course, it's not strictly necessary, because user could upgrade to the timestamped store first as intermediate step, too.

Thoughts?


-Matthias

On 2/3/26 4:55 PM, Matthias J. Sax wrote:
Thanks for the clarifications and updating the KIP.

One more follow up question (hopefully the last):

I notice, that the KIP does not list a new factory method on `Stores` interface for versioned state stores, ie, I think

    versionedKeyValueStoreWithHeaderBuilder(...)

is missing? Is this just an oversight, or do we not need this method?



-Matthias



On 2/3/26 11:33 AM, Alieh Saeedi via dev wrote:
Hi all,
The `convertToHeaderFormat(final byte[] value)` has been corrected from the previous, mistaken signature `convertToHeaderFormat(final byte[] key, final
byte[] value)`.

Bests,
Alieh

On Mon, Feb 2, 2026 at 7:34 PM Alieh Saeedi <[email protected]> wrote:

Hi all

Regarding SessionStoreWithHeaders, we reintroduced the
AggregationWithHeaders data type so we no longer rely on
ValueTimestampHeaders, thereby avoiding storage of unnecessary timestamps
for sessions in session stores.

Bests,
Alieh

On Thu, Jan 29, 2026 at 9:26 PM Alieh Saeedi <[email protected]> wrote:

Hey Matthias,

Thanks for all the good points you raised.

MJS-5: It’s good that we walked through all the possible options. It
initially seemed like a nice idea, but as you pointed out, there’s no real
benefit and we might even end up with longer values. In both formats we
still need to compute an offset by reading a varint to retrieve the value. I also updated the KIP and added a few sentences to clarify why we store
headers_size as well.

MJS-6: Yes, KIP‑258 already applied the same pattern for window/session
stores: KV uses dual column families with lazy per‑entry migration, while window/session stores do a clean break at the segment level so old segments stay in the legacy format and new ones use the new format. Segment‑level
versioning maps well to windows/sessions because they’re already
time‑segmented and constrained by retention, so we avoid dual‑CF complexity in every small segment DB while still getting a natural rolling upgrade as old segments age out. Compared with a dual‑CF “dual accessor” approach for window/session, the clean break is less code, easier to reason about, and reduces RocksDB overhead, with the trade‑off that legacy segments never get backfilled with new metadata unless one explicitly rebuilds or migrates the
state. I added some clarification to the KIP on that point as well.

MJS-7 and MJS-8: Great catches—I've updated the Compatibility and Testing
sections accordingly.

Best,
Alieh

On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]> wrote:

Great discussion! Seems we are heading into the right direction.

Thanks for clarifying the open question about the header serialization
format, VersionedRecordWithHeaders, StateSerdes, and upgrade path.


A few follow up questions:

MJS-5: As we are keeping `headers_size` now, I am wondering if there
would be a benefit to change the byte format to the same order as used
in Kafka messages, ie

    [payload_size][payload][headers_byte]

The only disadvantage I see would be, that I expect `header_size` to be
smaller than `payload_size` for most cases, so we might need a little
bit more space on average for the var-int encoding. But in both cases,
we would be able to implement lazy deserialization. Not saying we have
to do it this way -- in general I agree there is not much benefit to use the same order as Kafka messages do as it was already pointed out. Just
wanted to mention it for completeness. Thoughts?


MJS-5-B: One request though: the KIP should explain why we need to add
`header_size` (or `payload_size` in case we really make this change).
Reading the KIP as-is, I would always ask myself why we would need
`header_size` -- so mentioning lazy deserialization explicitly as reason why we add this field would be great to not puzzle readers about it. --
The KIP mentions lazy-deserialization later in the "Compatibility"
section, but does not make the connection to `header_size` field
explicit in this section either.


MJS-6. For the upgrade path the KIP mentions

Window/Session: Employs a clean break at the segment level—old
segments stay as-is; new segments use the new format.

I am wondering why we do it this way? Did KIP-258 also do this (I cannot remember). It's an interesting idea. I am just wondering about pros/cons
compared to follow the same dual-cf-accessor path as we do for
non-windowed stores. Also from an implementation POV -- would it be more
or less code to write?


MJS-7. In the "Compatibility" section the KIP states

Backward Compatibility
- Public API: No existing APIs are deprecated. The new header-aware
interfaces and factory methods are additive.

As we deprecate some methods on `StateSerdes` now, this is not correct
any longer and should be updated.


MJS-8: Testing. -- There is no mentioning of system tests. And maybe we
don't need any. But might be good to be explicit. Did KIP-258 add new
system tests?



@TengYao: Yes, your understanding of KS/Windowed vs Session store is
correct. It's really all about the optimization to avoid storing "event time" for sessions twice, as we know "event time == window-end". That's why using `ValueTimestampHeaders` for header-session store might not be
ideal, as we would lose this optimization. Introducing
`AggregationWithHeaders` is an attempt to keep this optimization though.




-Matthias




On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote:
Updates to KIP

-

1- A varint header_size field is introduced to enable lazy
deserialization
when scanning large ranges.
-

2- The current serialization/deserialization methods in StateSerdes are
marked as deprecated to keep the class concise.
-

3- Note that VersionedKeyValueStoreWithHeaders cannot extend
VersionedKeyValueStore because their methods differ in input and/or
output
types. In particular, the VersionedRecord returned by
VersionedKeyValueStore
methods is a final class and therefore cannot be subclassed.

Thanks,
Alieh

On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]>
wrote:

chia_03: Regarding the header size, using a Varint is consistent with
Kafka's serialization standards. It avoids the overhead of a large
fixed-size field while still achieving the efficient skipping
capability we
want.

chia_04: That makes sense.

Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四 下午10:59寫道:

Hi Chia-Ping Tsai,

Thanks for the feedback.

chia_03: The difficulty with adding a header length is deciding
between a
fixed-size field for all records or a configuration allowing users to
define a maximum. Alternatively, we could consider using a varint
for the
header length to remain flexible and space-efficient.

chia_04:
It only makes sense to give the second column family its own RocksDB
config if its access pattern or data characteristics are materially
different.
Here we have the same keys, the
same or very similar read/write patterns (e.g., same get, put, range
queries),
and roughly comparable value sizes (CF2 slightly larger per entry).
Then from RocksDB’s perspective the two CFs behave very similarly:
both are generic key–value blobs, written and read with the same
pattern. Most of the important RocksDB options (compaction style,
write buffer sizes, block cache, bloom filters, etc.) would be tuned
the same way for both.
Do you see huge difference between these two?

Thanks,
Alieh

On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai <[email protected]>
wrote:

hi

chia_03: should we provide a more effective way to load the value
without
scanning the header bytes? (e.g., by storing the total size of
headers)

chia_04: Do we need to allow separate Rocksdb configuration for the
new
column family

Best,
Chia-Ping

On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote:
Hi all,

I’d like to start a discussion on KIP-1271, which proposes allowing
Kafka
Streams state stores to preserve record headers.
This would let header-based metadata like schema IDs, tracing info,
and
feature flags be stored and restored alongside values.
The KIP introduces header-aware store types and a small config to
cap
the
size of headers written into state.
Details are in the KIP:



https://cwiki.apache.org/confluence/display/KAFKA/ KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
.
I’d appreciate your feedback and questions on the proposal.

Thanks,
Alieh










Reply via email to