Guozhang Wang created KAFKA-7718:
------------------------------------
Summary: Allow customized header inheritance for stateful
operators in DSL
Key: KAFKA-7718
URL: https://issues.apache.org/jira/browse/KAFKA-7718
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Guozhang Wang
As a follow-up work of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API,
we want to provide allow users to customize how record headers are inherited
while traversing the topology at the DSL layer (at the lower-level Processor
API layer, users are already capable for customizing and inheriting the headers
as they forward the records to next processor nodes).
Today the headers are implicitly inherited throughout the topology without any
modifications within the Streams library. For stateless operators (filter, map,
etc) this default inheritance policy should be sufficient. For stateful
operators where multiple input records may be generating a single record (i.e.
it is an n:1 transformations rather than 1:1 mapping), since we only inherit
from the triggering record, which would seem to be a "random" choice to the
users and other records' headers are lost.
I'd propose we extend DSL to allow users to customize the headers inheritance
policy for stateful operators, namely Joins and Aggregations. It would contain
two parts:
1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control
object with an additional function that allows users to pass in a lambda
function (let's say its called HeadersMerger, but name subject to discuss over
KIP) that takes two Headers object and generated a single Headers object in the
return value.
2) On the implementation layer, we need to actually store the headers at the
materialized state store so that they can be retrieved along with the record
for join / aggregation processor. This would be changing the state store value
bytes organization and hence better be considered carefully. Then when join /
aggregate processor is triggered, the Headers of both records will be retrieved
(one from the triggering record, one read from the materialized state store)
and then passed to the HeadersMerger. Some low-hanging optimizations can be
considered though, e.g. if users do not have overridden this interface, then we
can consider not reading the headers from the other side at all to save IO cost.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)