Guozhang Wang created KAFKA-7718: ------------------------------------ Summary: Allow customized header inheritance for stateful operators in DSL Key: KAFKA-7718 URL: https://issues.apache.org/jira/browse/KAFKA-7718 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang
As a follow-up work of https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API, we want to provide allow users to customize how record headers are inherited while traversing the topology at the DSL layer (at the lower-level Processor API layer, users are already capable for customizing and inheriting the headers as they forward the records to next processor nodes). Today the headers are implicitly inherited throughout the topology without any modifications within the Streams library. For stateless operators (filter, map, etc) this default inheritance policy should be sufficient. For stateful operators where multiple input records may be generating a single record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we only inherit from the triggering record, which would seem to be a "random" choice to the users and other records' headers are lost. I'd propose we extend DSL to allow users to customize the headers inheritance policy for stateful operators, namely Joins and Aggregations. It would contain two parts: 1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control object with an additional function that allows users to pass in a lambda function (let's say its called HeadersMerger, but name subject to discuss over KIP) that takes two Headers object and generated a single Headers object in the return value. 2) On the implementation layer, we need to actually store the headers at the materialized state store so that they can be retrieved along with the record for join / aggregation processor. This would be changing the state store value bytes organization and hence better be considered carefully. Then when join / aggregate processor is triggered, the Headers of both records will be retrieved (one from the triggering record, one read from the materialized state store) and then passed to the HeadersMerger. Some low-hanging optimizations can be considered though, e.g. if users do not have overridden this interface, then we can consider not reading the headers from the other side at all to save IO cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)