Thanks for the KIP Alieh. Very exciting to see this getting started.
It's long standing gap in Kafka Streams.
Couple of questions:
MJS1: Why do we need `streams.store.headers.max.bytes` config? We
currently do not limit the size of the key or value inside KS layer, and
I don't see why we would need a bound for headers? Of course, there is
Kafka configs which limit the size of a message, and from my
understanding it would implicitly also cover headers. So I am wondering
what we gain by this config?
MJS2: What is the purpose of `AggregationWithHeaders` class?
MJS3: You propose to add a new `HeaderByteStore` interface (similar to
`TimestampedBytesStore`), but I believe its purpose is not just to
provide a static method to convert the format. I believe it must also be
used as a "marker interface", to tell the runtime if the
internal/underlying RocksDBStore is a "headers store" or not (this is
necessary for backward compatibility, and upgrading). This change
implies that if a user implements a custom store, they would need to
also implement this interface if the store supports headers (and can be
plugging into the DSL as "header store" -- of course, DSL support is a
future KIP only, but I think we should already cover this part correctly
from day one on, to ensure we don't hit issue down the road). If a
(byte) store does not implement the `HeaderBytesStore` interface, it
means that the returned value-byte do not have the propose
"header+payload" format. The KIP should explain this in more details.
MJS4: For `StateSerdes`, I am wondering if we should deprecate the
existing methods for which new overloads including a `Headers` parameter
are added? If a store does not support headers, it would just ignore
this new parameter, and we could remove the existing methods with 5.0
(?) release and keep the API surface area smaller. Or do we still need
the existing methods?
-Matthias
On 1/9/26 5:20 PM, Chia-Ping Tsai wrote:
hi Alieh
Thanks for the KIP. This proposal seems to open the door for many interesting
use cases. I have a few questions?
chia_0: could you clarify the serialization format of headers_bytes?
chia_1: how does the state store distinguish between legacy values and new
values with headers? Since the new format starts with a 2-bytes length, is
there a risk of ambiguity with existing data?
chia_2: does the implementation guarantee that the order of headers is preserved
Best,
Chia-Ping
Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道:
Hi all,
I’d like to start a discussion on KIP-1271, which proposes allowing Kafka
Streams state stores to preserve record headers.
This would let header-based metadata like schema IDs, tracing info, and
feature flags be stored and restored alongside values.
The KIP introduces header-aware store types and a small config to cap the
size of headers written into state.
Details are in the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
.
I’d appreciate your feedback and questions on the proposal.
Thanks,
Alieh