Abdullah alkhawatrah created KAFKA-15595: --------------------------------------------
Summary: Session window aggregate drops records headers Key: KAFKA-15595 URL: https://issues.apache.org/jira/browse/KAFKA-15595 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.5.1 Reporter: Abdullah alkhawatrah Hey, While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow aggregate behaviour, it seems now that custom headers added before the aggregate are dropped. I could reproduce the behaviour with the following test topology: {code:java} // code placeholder final StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(EARLIEST)) .process(() -> new Processor<Object, Object, Object, Object>() { private ProcessorContext<Object, Object> context; @Override public void init(final ProcessorContext<Object, Object> context) { this.context = context; } @Override public void process(Record<Object, Object> record) { record.headers().add("key1", record.value().toString().getBytes()); context.forward(record); } }) .groupByKey() .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), Duration.ofDays(1L))) .aggregate(() -> 1, (key, value, aggregate) -> aggregate, (aggKey, aggOne, aggTwo) -> aggTwo) .toStream() .map((key, value) -> new KeyValue<>(key.key(), value)) .to(outputTopic); {code} Checking evens in the `outputTopic` show that the headers are empty. With 3.2.* the same topology would have propagated the headers. -- This message was sent by Atlassian Jira (v8.20.10#820010)