Thanks!

On 2/5/26 2:27 AM, Alieh Saeedi via dev wrote:
Hey Matthias

Thanks for asking for additional details and clarification.
I’ve added a section *“Up/downgrade limitations and required steps”* to
make these points explicit, and I’ve also added the missing
versionedKeyValueStoreWithHeadersBuilder() method to the KIP.

Bests,
Alieh

On Wed, Feb 4, 2026 at 2:27 AM Matthias J. Sax <[email protected]> wrote:

Actually one more question:

Do we support upgrading to the new header store from both
`KeyValueStore` and `TimestampedKeyValueStore` (similar for windowed
store)? The KIP is no explicit about it.

Not sure if it would be complex to support both upgrades -- it does
sound useful. Of course, it's not strictly necessary, because user could
upgrade to the timestamped store first as intermediate step, too.

Thoughts?


-Matthias

On 2/3/26 4:55 PM, Matthias J. Sax wrote:
Thanks for the clarifications and updating the KIP.

One more follow up question (hopefully the last):

I notice, that the KIP does not list a new factory method on `Stores`
interface for versioned state stores, ie, I think

     versionedKeyValueStoreWithHeaderBuilder(...)

is missing? Is this just an oversight, or do we not need this method?



-Matthias



On 2/3/26 11:33 AM, Alieh Saeedi via dev wrote:
Hi all,
The `convertToHeaderFormat(final byte[] value)` has been corrected
from the
previous, mistaken signature `convertToHeaderFormat(final byte[] key,
final
byte[] value)`.

Bests,
Alieh

On Mon, Feb 2, 2026 at 7:34 PM Alieh Saeedi <[email protected]>
wrote:

Hi all

Regarding SessionStoreWithHeaders, we reintroduced the
AggregationWithHeaders data type so we no longer rely on
ValueTimestampHeaders, thereby avoiding storage of unnecessary
timestamps
for sessions in session stores.

Bests,
Alieh

On Thu, Jan 29, 2026 at 9:26 PM Alieh Saeedi <[email protected]>
wrote:

Hey Matthias,

Thanks for all the good points you raised.

MJS-5: It’s good that we walked through all the possible options. It
initially seemed like a nice idea, but as you pointed out, there’s
no real
benefit and we might even end up with longer values. In both formats
we
still need to compute an offset by reading a varint to retrieve the
value.
I also updated the KIP and added a few sentences to clarify why we
store
headers_size as well.

MJS-6: Yes, KIP‑258 already applied the same pattern for
window/session
stores: KV uses dual column families with lazy per‑entry migration,
while
window/session stores do a clean break at the segment level so old
segments
stay in the legacy format and new ones use the new format.
Segment‑level
versioning maps well to windows/sessions because they’re already
time‑segmented and constrained by retention, so we avoid dual‑CF
complexity
in every small segment DB while still getting a natural rolling
upgrade as
old segments age out. Compared with a dual‑CF “dual accessor”
approach for
window/session, the clean break is less code, easier to reason
about, and
reduces RocksDB overhead, with the trade‑off that legacy segments
never get
backfilled with new metadata unless one explicitly rebuilds or
migrates the
state. I added some clarification to the KIP on that point as well.

MJS-7 and MJS-8: Great catches—I've updated the Compatibility and
Testing
sections accordingly.

Best,
Alieh

On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]>
wrote:

Great discussion! Seems we are heading into the right direction.

Thanks for clarifying the open question about the header
serialization
format, VersionedRecordWithHeaders, StateSerdes, and upgrade path.


A few follow up questions:

MJS-5: As we are keeping `headers_size` now, I am wondering if there
would be a benefit to change the byte format to the same order as
used
in Kafka messages, ie

     [payload_size][payload][headers_byte]

The only disadvantage I see would be, that I expect `header_size`
to be
smaller than `payload_size` for most cases, so we might need a little
bit more space on average for the var-int encoding. But in both
cases,
we would be able to implement lazy deserialization. Not saying we
have
to do it this way -- in general I agree there is not much benefit
to use
the same order as Kafka messages do as it was already pointed out.
Just
wanted to mention it for completeness. Thoughts?


MJS-5-B: One request though: the KIP should explain why we need to
add
`header_size` (or `payload_size` in case we really make this change).
Reading the KIP as-is, I would always ask myself why we would need
`header_size` -- so mentioning lazy deserialization explicitly as
reason
why we add this field would be great to not puzzle readers about
it. --
The KIP mentions lazy-deserialization later in the "Compatibility"
section, but does not make the connection to `header_size` field
explicit in this section either.


MJS-6. For the upgrade path the KIP mentions

Window/Session: Employs a clean break at the segment level—old
segments stay as-is; new segments use the new format.

I am wondering why we do it this way? Did KIP-258 also do this (I
cannot
remember). It's an interesting idea. I am just wondering about
pros/cons
compared to follow the same dual-cf-accessor path as we do for
non-windowed stores. Also from an implementation POV -- would it be
more
or less code to write?


MJS-7. In the "Compatibility" section the KIP states

Backward Compatibility
- Public API: No existing APIs are deprecated. The new header-aware
interfaces and factory methods are additive.

As we deprecate some methods on `StateSerdes` now, this is not
correct
any longer and should be updated.


MJS-8: Testing. -- There is no mentioning of system tests. And
maybe we
don't need any. But might be good to be explicit. Did KIP-258 add new
system tests?



@TengYao: Yes, your understanding of KS/Windowed vs Session store is
correct. It's really all about the optimization to avoid storing
"event
time" for sessions twice, as we know "event time == window-end".
That's
why using `ValueTimestampHeaders` for header-session store might
not be
ideal, as we would lose this optimization. Introducing
`AggregationWithHeaders` is an attempt to keep this optimization
though.




-Matthias




On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote:
Updates to KIP

-

1- A varint header_size field is introduced to enable lazy
deserialization
when scanning large ranges.
-

2- The current serialization/deserialization methods in
StateSerdes are
marked as deprecated to keep the class concise.
-

3- Note that VersionedKeyValueStoreWithHeaders cannot extend
VersionedKeyValueStore because their methods differ in input and/or
output
types. In particular, the VersionedRecord returned by
VersionedKeyValueStore
methods is a final class and therefore cannot be subclassed.

Thanks,
Alieh

On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]>
wrote:

chia_03: Regarding the header size, using a Varint is consistent
with
Kafka's serialization standards. It avoids the overhead of a large
fixed-size field while still achieving the efficient skipping
capability we
want.

chia_04: That makes sense.

Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四
下午10:59寫道:

Hi Chia-Ping Tsai,

Thanks for the feedback.

chia_03: The difficulty with adding a header length is deciding
between a
fixed-size field for all records or a configuration allowing
users to
define a maximum. Alternatively, we could consider using a varint
for the
header length to remain flexible and space-efficient.

chia_04:
It only makes sense to give the second column family its own
RocksDB
config if its access pattern or data characteristics are
materially
different.
Here we have the same keys, the
same or very similar read/write patterns (e.g., same get, put,
range
queries),
and roughly comparable value sizes (CF2 slightly larger per
entry).
Then from RocksDB’s perspective the two CFs behave very similarly:
both are generic key–value blobs, written and read with the same
pattern. Most of the important RocksDB options (compaction style,
write buffer sizes, block cache, bloom filters, etc.) would be
tuned
the same way for both.
Do you see huge difference between these two?

Thanks,
Alieh

On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai
<[email protected]>
wrote:

hi

chia_03: should we provide a more effective way to load the value
without
scanning the header bytes? (e.g., by storing the total size of
headers)

chia_04: Do we need to allow separate Rocksdb configuration for
the
new
column family

Best,
Chia-Ping

On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote:
Hi all,

I’d like to start a discussion on KIP-1271, which proposes
allowing
Kafka
Streams state stores to preserve record headers.
This would let header-based metadata like schema IDs, tracing
info,
and
feature flags be stored and restored alongside values.
The KIP introduces header-aware store types and a small config
to
cap
the
size of headers written into state.
Details are in the KIP:



https://cwiki.apache.org/confluence/display/KAFKA/
KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
.
I’d appreciate your feedback and questions on the proposal.

Thanks,
Alieh













Reply via email to