Hey Matthias

Thanks for asking for additional details and clarification.
I’ve added a section *“Up/downgrade limitations and required steps”* to
make these points explicit, and I’ve also added the missing
versionedKeyValueStoreWithHeadersBuilder() method to the KIP.

Bests,
Alieh

On Wed, Feb 4, 2026 at 2:27 AM Matthias J. Sax <[email protected]> wrote:

> Actually one more question:
>
> Do we support upgrading to the new header store from both
> `KeyValueStore` and `TimestampedKeyValueStore` (similar for windowed
> store)? The KIP is no explicit about it.
>
> Not sure if it would be complex to support both upgrades -- it does
> sound useful. Of course, it's not strictly necessary, because user could
> upgrade to the timestamped store first as intermediate step, too.
>
> Thoughts?
>
>
> -Matthias
>
> On 2/3/26 4:55 PM, Matthias J. Sax wrote:
> > Thanks for the clarifications and updating the KIP.
> >
> > One more follow up question (hopefully the last):
> >
> > I notice, that the KIP does not list a new factory method on `Stores`
> > interface for versioned state stores, ie, I think
> >
> >     versionedKeyValueStoreWithHeaderBuilder(...)
> >
> > is missing? Is this just an oversight, or do we not need this method?
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 2/3/26 11:33 AM, Alieh Saeedi via dev wrote:
> >> Hi all,
> >> The `convertToHeaderFormat(final byte[] value)` has been corrected
> >> from the
> >> previous, mistaken signature `convertToHeaderFormat(final byte[] key,
> >> final
> >> byte[] value)`.
> >>
> >> Bests,
> >> Alieh
> >>
> >> On Mon, Feb 2, 2026 at 7:34 PM Alieh Saeedi <[email protected]>
> wrote:
> >>
> >>> Hi all
> >>>
> >>> Regarding SessionStoreWithHeaders, we reintroduced the
> >>> AggregationWithHeaders data type so we no longer rely on
> >>> ValueTimestampHeaders, thereby avoiding storage of unnecessary
> >>> timestamps
> >>> for sessions in session stores.
> >>>
> >>> Bests,
> >>> Alieh
> >>>
> >>> On Thu, Jan 29, 2026 at 9:26 PM Alieh Saeedi <[email protected]>
> >>> wrote:
> >>>
> >>>> Hey Matthias,
> >>>>
> >>>> Thanks for all the good points you raised.
> >>>>
> >>>> MJS-5: It’s good that we walked through all the possible options. It
> >>>> initially seemed like a nice idea, but as you pointed out, there’s
> >>>> no real
> >>>> benefit and we might even end up with longer values. In both formats
> we
> >>>> still need to compute an offset by reading a varint to retrieve the
> >>>> value.
> >>>> I also updated the KIP and added a few sentences to clarify why we
> >>>> store
> >>>> headers_size as well.
> >>>>
> >>>> MJS-6: Yes, KIP‑258 already applied the same pattern for
> window/session
> >>>> stores: KV uses dual column families with lazy per‑entry migration,
> >>>> while
> >>>> window/session stores do a clean break at the segment level so old
> >>>> segments
> >>>> stay in the legacy format and new ones use the new format.
> >>>> Segment‑level
> >>>> versioning maps well to windows/sessions because they’re already
> >>>> time‑segmented and constrained by retention, so we avoid dual‑CF
> >>>> complexity
> >>>> in every small segment DB while still getting a natural rolling
> >>>> upgrade as
> >>>> old segments age out. Compared with a dual‑CF “dual accessor”
> >>>> approach for
> >>>> window/session, the clean break is less code, easier to reason
> >>>> about, and
> >>>> reduces RocksDB overhead, with the trade‑off that legacy segments
> >>>> never get
> >>>> backfilled with new metadata unless one explicitly rebuilds or
> >>>> migrates the
> >>>> state. I added some clarification to the KIP on that point as well.
> >>>>
> >>>> MJS-7 and MJS-8: Great catches—I've updated the Compatibility and
> >>>> Testing
> >>>> sections accordingly.
> >>>>
> >>>> Best,
> >>>> Alieh
> >>>>
> >>>> On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]>
> >>>> wrote:
> >>>>
> >>>>> Great discussion! Seems we are heading into the right direction.
> >>>>>
> >>>>> Thanks for clarifying the open question about the header
> serialization
> >>>>> format, VersionedRecordWithHeaders, StateSerdes, and upgrade path.
> >>>>>
> >>>>>
> >>>>> A few follow up questions:
> >>>>>
> >>>>> MJS-5: As we are keeping `headers_size` now, I am wondering if there
> >>>>> would be a benefit to change the byte format to the same order as
> used
> >>>>> in Kafka messages, ie
> >>>>>
> >>>>>     [payload_size][payload][headers_byte]
> >>>>>
> >>>>> The only disadvantage I see would be, that I expect `header_size`
> >>>>> to be
> >>>>> smaller than `payload_size` for most cases, so we might need a little
> >>>>> bit more space on average for the var-int encoding. But in both
> cases,
> >>>>> we would be able to implement lazy deserialization. Not saying we
> have
> >>>>> to do it this way -- in general I agree there is not much benefit
> >>>>> to use
> >>>>> the same order as Kafka messages do as it was already pointed out.
> >>>>> Just
> >>>>> wanted to mention it for completeness. Thoughts?
> >>>>>
> >>>>>
> >>>>> MJS-5-B: One request though: the KIP should explain why we need to
> add
> >>>>> `header_size` (or `payload_size` in case we really make this change).
> >>>>> Reading the KIP as-is, I would always ask myself why we would need
> >>>>> `header_size` -- so mentioning lazy deserialization explicitly as
> >>>>> reason
> >>>>> why we add this field would be great to not puzzle readers about
> >>>>> it. --
> >>>>> The KIP mentions lazy-deserialization later in the "Compatibility"
> >>>>> section, but does not make the connection to `header_size` field
> >>>>> explicit in this section either.
> >>>>>
> >>>>>
> >>>>> MJS-6. For the upgrade path the KIP mentions
> >>>>>
> >>>>>> Window/Session: Employs a clean break at the segment level—old
> >>>>> segments stay as-is; new segments use the new format.
> >>>>>
> >>>>> I am wondering why we do it this way? Did KIP-258 also do this (I
> >>>>> cannot
> >>>>> remember). It's an interesting idea. I am just wondering about
> >>>>> pros/cons
> >>>>> compared to follow the same dual-cf-accessor path as we do for
> >>>>> non-windowed stores. Also from an implementation POV -- would it be
> >>>>> more
> >>>>> or less code to write?
> >>>>>
> >>>>>
> >>>>> MJS-7. In the "Compatibility" section the KIP states
> >>>>>
> >>>>>> Backward Compatibility
> >>>>>> - Public API: No existing APIs are deprecated. The new header-aware
> >>>>> interfaces and factory methods are additive.
> >>>>>
> >>>>> As we deprecate some methods on `StateSerdes` now, this is not
> correct
> >>>>> any longer and should be updated.
> >>>>>
> >>>>>
> >>>>> MJS-8: Testing. -- There is no mentioning of system tests. And
> >>>>> maybe we
> >>>>> don't need any. But might be good to be explicit. Did KIP-258 add new
> >>>>> system tests?
> >>>>>
> >>>>>
> >>>>>
> >>>>> @TengYao: Yes, your understanding of KS/Windowed vs Session store is
> >>>>> correct. It's really all about the optimization to avoid storing
> >>>>> "event
> >>>>> time" for sessions twice, as we know "event time == window-end".
> >>>>> That's
> >>>>> why using `ValueTimestampHeaders` for header-session store might
> >>>>> not be
> >>>>> ideal, as we would lose this optimization. Introducing
> >>>>> `AggregationWithHeaders` is an attempt to keep this optimization
> >>>>> though.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote:
> >>>>>> Updates to KIP
> >>>>>>
> >>>>>> -
> >>>>>>
> >>>>>> 1- A varint header_size field is introduced to enable lazy
> >>>>> deserialization
> >>>>>> when scanning large ranges.
> >>>>>> -
> >>>>>>
> >>>>>> 2- The current serialization/deserialization methods in
> >>>>>> StateSerdes are
> >>>>>> marked as deprecated to keep the class concise.
> >>>>>> -
> >>>>>>
> >>>>>> 3- Note that VersionedKeyValueStoreWithHeaders cannot extend
> >>>>>> VersionedKeyValueStore because their methods differ in input and/or
> >>>>> output
> >>>>>> types. In particular, the VersionedRecord returned by
> >>>>> VersionedKeyValueStore
> >>>>>> methods is a final class and therefore cannot be subclassed.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Alieh
> >>>>>>
> >>>>>> On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]>
> >>>>> wrote:
> >>>>>>
> >>>>>>> chia_03: Regarding the header size, using a Varint is consistent
> >>>>>>> with
> >>>>>>> Kafka's serialization standards. It avoids the overhead of a large
> >>>>>>> fixed-size field while still achieving the efficient skipping
> >>>>> capability we
> >>>>>>> want.
> >>>>>>>
> >>>>>>> chia_04: That makes sense.
> >>>>>>>
> >>>>>>> Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四
> >>>>>>> 下午10:59寫道:
> >>>>>>>
> >>>>>>>> Hi Chia-Ping Tsai,
> >>>>>>>>
> >>>>>>>> Thanks for the feedback.
> >>>>>>>>
> >>>>>>>> chia_03: The difficulty with adding a header length is deciding
> >>>>> between a
> >>>>>>>> fixed-size field for all records or a configuration allowing
> >>>>>>>> users to
> >>>>>>>> define a maximum. Alternatively, we could consider using a varint
> >>>>> for the
> >>>>>>>> header length to remain flexible and space-efficient.
> >>>>>>>>
> >>>>>>>> chia_04:
> >>>>>>>> It only makes sense to give the second column family its own
> >>>>>>>> RocksDB
> >>>>>>>> config if its access pattern or data characteristics are
> materially
> >>>>>>>> different.
> >>>>>>>> Here we have the same keys, the
> >>>>>>>> same or very similar read/write patterns (e.g., same get, put,
> >>>>>>>> range
> >>>>>>>> queries),
> >>>>>>>> and roughly comparable value sizes (CF2 slightly larger per
> entry).
> >>>>>>>> Then from RocksDB’s perspective the two CFs behave very similarly:
> >>>>>>>> both are generic key–value blobs, written and read with the same
> >>>>>>>> pattern. Most of the important RocksDB options (compaction style,
> >>>>>>>> write buffer sizes, block cache, bloom filters, etc.) would be
> >>>>>>>> tuned
> >>>>>>>> the same way for both.
> >>>>>>>> Do you see huge difference between these two?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Alieh
> >>>>>>>>
> >>>>>>>> On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai
> >>>>>>>> <[email protected]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> hi
> >>>>>>>>>
> >>>>>>>>> chia_03: should we provide a more effective way to load the value
> >>>>>>>> without
> >>>>>>>>> scanning the header bytes? (e.g., by storing the total size of
> >>>>> headers)
> >>>>>>>>>
> >>>>>>>>> chia_04: Do we need to allow separate Rocksdb configuration for
> >>>>>>>>> the
> >>>>> new
> >>>>>>>>> column family
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Chia-Ping
> >>>>>>>>>
> >>>>>>>>> On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote:
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I’d like to start a discussion on KIP-1271, which proposes
> >>>>>>>>>> allowing
> >>>>>>>> Kafka
> >>>>>>>>>> Streams state stores to preserve record headers.
> >>>>>>>>>> This would let header-based metadata like schema IDs, tracing
> >>>>>>>>>> info,
> >>>>>>>> and
> >>>>>>>>>> feature flags be stored and restored alongside values.
> >>>>>>>>>> The KIP introduces header-aware store types and a small config
> to
> >>>>> cap
> >>>>>>>> the
> >>>>>>>>>> size of headers written into state.
> >>>>>>>>>> Details are in the KIP:
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/
> >>>>> KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
> >>>>>>>>>> .
> >>>>>>>>>> I’d appreciate your feedback and questions on the proposal.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Alieh
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>
> >
>
>

Reply via email to