[ https://issues.apache.org/jira/browse/KAFKA-15595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-15595: --------------------------------------- Assignee: (was: Hao Li) > Session window aggregate drops records headers > ---------------------------------------------- > > Key: KAFKA-15595 > URL: https://issues.apache.org/jira/browse/KAFKA-15595 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.5.1 > Reporter: Abdullah alkhawatrah > Priority: Major > > Hey, > While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow > aggregate behaviour, it seems now that custom headers added before the > aggregate are dropped. > I could reproduce the behaviour with the following test topology: > {code:java} > // code placeholder > final StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(EARLIEST)) > .process(() -> new Processor<Object, Object, Object, Object>() { > private ProcessorContext<Object, Object> context; > @Override > public void init(final ProcessorContext<Object, Object> context) { > this.context = context; > } > @Override > public void process(Record<Object, Object> record) { > record.headers().add("key1", > record.value().toString().getBytes()); > context.forward(record); > } > }) > .groupByKey() > > .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), > Duration.ofDays(1L))) > .aggregate(() -> 1, > (key, value, aggregate) -> aggregate, > (aggKey, aggOne, aggTwo) -> aggTwo) > .toStream() > .map((key, value) -> new KeyValue<>(key.key(), value)) > .to(outputTopic); {code} > Checking evens in the `outputTopic` show that the headers are empty. With > 3.2.* the same topology would have propagated the headers. > > I can see here: > [https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205] > that now a new record is created ignoring the headers, while in 3.2.2, the > same record was forwarded after changing the key and value while keeping the > headers: > [https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196] > -- This message was sent by Atlassian Jira (v8.20.10#820010)