Hey Matthias Thanks for asking for additional details and clarification. I’ve added a section *“Up/downgrade limitations and required steps”* to make these points explicit, and I’ve also added the missing versionedKeyValueStoreWithHeadersBuilder() method to the KIP.
Bests, Alieh On Wed, Feb 4, 2026 at 2:27 AM Matthias J. Sax <[email protected]> wrote: > Actually one more question: > > Do we support upgrading to the new header store from both > `KeyValueStore` and `TimestampedKeyValueStore` (similar for windowed > store)? The KIP is no explicit about it. > > Not sure if it would be complex to support both upgrades -- it does > sound useful. Of course, it's not strictly necessary, because user could > upgrade to the timestamped store first as intermediate step, too. > > Thoughts? > > > -Matthias > > On 2/3/26 4:55 PM, Matthias J. Sax wrote: > > Thanks for the clarifications and updating the KIP. > > > > One more follow up question (hopefully the last): > > > > I notice, that the KIP does not list a new factory method on `Stores` > > interface for versioned state stores, ie, I think > > > > versionedKeyValueStoreWithHeaderBuilder(...) > > > > is missing? Is this just an oversight, or do we not need this method? > > > > > > > > -Matthias > > > > > > > > On 2/3/26 11:33 AM, Alieh Saeedi via dev wrote: > >> Hi all, > >> The `convertToHeaderFormat(final byte[] value)` has been corrected > >> from the > >> previous, mistaken signature `convertToHeaderFormat(final byte[] key, > >> final > >> byte[] value)`. > >> > >> Bests, > >> Alieh > >> > >> On Mon, Feb 2, 2026 at 7:34 PM Alieh Saeedi <[email protected]> > wrote: > >> > >>> Hi all > >>> > >>> Regarding SessionStoreWithHeaders, we reintroduced the > >>> AggregationWithHeaders data type so we no longer rely on > >>> ValueTimestampHeaders, thereby avoiding storage of unnecessary > >>> timestamps > >>> for sessions in session stores. > >>> > >>> Bests, > >>> Alieh > >>> > >>> On Thu, Jan 29, 2026 at 9:26 PM Alieh Saeedi <[email protected]> > >>> wrote: > >>> > >>>> Hey Matthias, > >>>> > >>>> Thanks for all the good points you raised. > >>>> > >>>> MJS-5: It’s good that we walked through all the possible options. It > >>>> initially seemed like a nice idea, but as you pointed out, there’s > >>>> no real > >>>> benefit and we might even end up with longer values. In both formats > we > >>>> still need to compute an offset by reading a varint to retrieve the > >>>> value. > >>>> I also updated the KIP and added a few sentences to clarify why we > >>>> store > >>>> headers_size as well. > >>>> > >>>> MJS-6: Yes, KIP‑258 already applied the same pattern for > window/session > >>>> stores: KV uses dual column families with lazy per‑entry migration, > >>>> while > >>>> window/session stores do a clean break at the segment level so old > >>>> segments > >>>> stay in the legacy format and new ones use the new format. > >>>> Segment‑level > >>>> versioning maps well to windows/sessions because they’re already > >>>> time‑segmented and constrained by retention, so we avoid dual‑CF > >>>> complexity > >>>> in every small segment DB while still getting a natural rolling > >>>> upgrade as > >>>> old segments age out. Compared with a dual‑CF “dual accessor” > >>>> approach for > >>>> window/session, the clean break is less code, easier to reason > >>>> about, and > >>>> reduces RocksDB overhead, with the trade‑off that legacy segments > >>>> never get > >>>> backfilled with new metadata unless one explicitly rebuilds or > >>>> migrates the > >>>> state. I added some clarification to the KIP on that point as well. > >>>> > >>>> MJS-7 and MJS-8: Great catches—I've updated the Compatibility and > >>>> Testing > >>>> sections accordingly. > >>>> > >>>> Best, > >>>> Alieh > >>>> > >>>> On Tue, Jan 27, 2026 at 6:50 PM Matthias J. Sax <[email protected]> > >>>> wrote: > >>>> > >>>>> Great discussion! Seems we are heading into the right direction. > >>>>> > >>>>> Thanks for clarifying the open question about the header > serialization > >>>>> format, VersionedRecordWithHeaders, StateSerdes, and upgrade path. > >>>>> > >>>>> > >>>>> A few follow up questions: > >>>>> > >>>>> MJS-5: As we are keeping `headers_size` now, I am wondering if there > >>>>> would be a benefit to change the byte format to the same order as > used > >>>>> in Kafka messages, ie > >>>>> > >>>>> [payload_size][payload][headers_byte] > >>>>> > >>>>> The only disadvantage I see would be, that I expect `header_size` > >>>>> to be > >>>>> smaller than `payload_size` for most cases, so we might need a little > >>>>> bit more space on average for the var-int encoding. But in both > cases, > >>>>> we would be able to implement lazy deserialization. Not saying we > have > >>>>> to do it this way -- in general I agree there is not much benefit > >>>>> to use > >>>>> the same order as Kafka messages do as it was already pointed out. > >>>>> Just > >>>>> wanted to mention it for completeness. Thoughts? > >>>>> > >>>>> > >>>>> MJS-5-B: One request though: the KIP should explain why we need to > add > >>>>> `header_size` (or `payload_size` in case we really make this change). > >>>>> Reading the KIP as-is, I would always ask myself why we would need > >>>>> `header_size` -- so mentioning lazy deserialization explicitly as > >>>>> reason > >>>>> why we add this field would be great to not puzzle readers about > >>>>> it. -- > >>>>> The KIP mentions lazy-deserialization later in the "Compatibility" > >>>>> section, but does not make the connection to `header_size` field > >>>>> explicit in this section either. > >>>>> > >>>>> > >>>>> MJS-6. For the upgrade path the KIP mentions > >>>>> > >>>>>> Window/Session: Employs a clean break at the segment level—old > >>>>> segments stay as-is; new segments use the new format. > >>>>> > >>>>> I am wondering why we do it this way? Did KIP-258 also do this (I > >>>>> cannot > >>>>> remember). It's an interesting idea. I am just wondering about > >>>>> pros/cons > >>>>> compared to follow the same dual-cf-accessor path as we do for > >>>>> non-windowed stores. Also from an implementation POV -- would it be > >>>>> more > >>>>> or less code to write? > >>>>> > >>>>> > >>>>> MJS-7. In the "Compatibility" section the KIP states > >>>>> > >>>>>> Backward Compatibility > >>>>>> - Public API: No existing APIs are deprecated. The new header-aware > >>>>> interfaces and factory methods are additive. > >>>>> > >>>>> As we deprecate some methods on `StateSerdes` now, this is not > correct > >>>>> any longer and should be updated. > >>>>> > >>>>> > >>>>> MJS-8: Testing. -- There is no mentioning of system tests. And > >>>>> maybe we > >>>>> don't need any. But might be good to be explicit. Did KIP-258 add new > >>>>> system tests? > >>>>> > >>>>> > >>>>> > >>>>> @TengYao: Yes, your understanding of KS/Windowed vs Session store is > >>>>> correct. It's really all about the optimization to avoid storing > >>>>> "event > >>>>> time" for sessions twice, as we know "event time == window-end". > >>>>> That's > >>>>> why using `ValueTimestampHeaders` for header-session store might > >>>>> not be > >>>>> ideal, as we would lose this optimization. Introducing > >>>>> `AggregationWithHeaders` is an attempt to keep this optimization > >>>>> though. > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote: > >>>>>> Updates to KIP > >>>>>> > >>>>>> - > >>>>>> > >>>>>> 1- A varint header_size field is introduced to enable lazy > >>>>> deserialization > >>>>>> when scanning large ranges. > >>>>>> - > >>>>>> > >>>>>> 2- The current serialization/deserialization methods in > >>>>>> StateSerdes are > >>>>>> marked as deprecated to keep the class concise. > >>>>>> - > >>>>>> > >>>>>> 3- Note that VersionedKeyValueStoreWithHeaders cannot extend > >>>>>> VersionedKeyValueStore because their methods differ in input and/or > >>>>> output > >>>>>> types. In particular, the VersionedRecord returned by > >>>>> VersionedKeyValueStore > >>>>>> methods is a final class and therefore cannot be subclassed. > >>>>>> > >>>>>> Thanks, > >>>>>> Alieh > >>>>>> > >>>>>> On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]> > >>>>> wrote: > >>>>>> > >>>>>>> chia_03: Regarding the header size, using a Varint is consistent > >>>>>>> with > >>>>>>> Kafka's serialization standards. It avoids the overhead of a large > >>>>>>> fixed-size field while still achieving the efficient skipping > >>>>> capability we > >>>>>>> want. > >>>>>>> > >>>>>>> chia_04: That makes sense. > >>>>>>> > >>>>>>> Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四 > >>>>>>> 下午10:59寫道: > >>>>>>> > >>>>>>>> Hi Chia-Ping Tsai, > >>>>>>>> > >>>>>>>> Thanks for the feedback. > >>>>>>>> > >>>>>>>> chia_03: The difficulty with adding a header length is deciding > >>>>> between a > >>>>>>>> fixed-size field for all records or a configuration allowing > >>>>>>>> users to > >>>>>>>> define a maximum. Alternatively, we could consider using a varint > >>>>> for the > >>>>>>>> header length to remain flexible and space-efficient. > >>>>>>>> > >>>>>>>> chia_04: > >>>>>>>> It only makes sense to give the second column family its own > >>>>>>>> RocksDB > >>>>>>>> config if its access pattern or data characteristics are > materially > >>>>>>>> different. > >>>>>>>> Here we have the same keys, the > >>>>>>>> same or very similar read/write patterns (e.g., same get, put, > >>>>>>>> range > >>>>>>>> queries), > >>>>>>>> and roughly comparable value sizes (CF2 slightly larger per > entry). > >>>>>>>> Then from RocksDB’s perspective the two CFs behave very similarly: > >>>>>>>> both are generic key–value blobs, written and read with the same > >>>>>>>> pattern. Most of the important RocksDB options (compaction style, > >>>>>>>> write buffer sizes, block cache, bloom filters, etc.) would be > >>>>>>>> tuned > >>>>>>>> the same way for both. > >>>>>>>> Do you see huge difference between these two? > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Alieh > >>>>>>>> > >>>>>>>> On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai > >>>>>>>> <[email protected]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> hi > >>>>>>>>> > >>>>>>>>> chia_03: should we provide a more effective way to load the value > >>>>>>>> without > >>>>>>>>> scanning the header bytes? (e.g., by storing the total size of > >>>>> headers) > >>>>>>>>> > >>>>>>>>> chia_04: Do we need to allow separate Rocksdb configuration for > >>>>>>>>> the > >>>>> new > >>>>>>>>> column family > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Chia-Ping > >>>>>>>>> > >>>>>>>>> On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote: > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> I’d like to start a discussion on KIP-1271, which proposes > >>>>>>>>>> allowing > >>>>>>>> Kafka > >>>>>>>>>> Streams state stores to preserve record headers. > >>>>>>>>>> This would let header-based metadata like schema IDs, tracing > >>>>>>>>>> info, > >>>>>>>> and > >>>>>>>>>> feature flags be stored and restored alongside values. > >>>>>>>>>> The KIP introduces header-aware store types and a small config > to > >>>>> cap > >>>>>>>> the > >>>>>>>>>> size of headers written into state. > >>>>>>>>>> Details are in the KIP: > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/ > >>>>> KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores > >>>>>>>>>> . > >>>>>>>>>> I’d appreciate your feedback and questions on the proposal. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Alieh > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >> > > > >
