[ 
https://issues.apache.org/jira/browse/KAFKA-15595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdullah alkhawatrah updated KAFKA-15595:
-----------------------------------------
    Description: 
Hey,

While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow 
aggregate behaviour, it seems now that custom headers added before the 
aggregate are dropped.

I could reproduce the behaviour with the following test topology:
{code:java}
// code placeholder
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(EARLIEST))
        .process(() -> new Processor<Object, Object, Object, Object>() {
            private ProcessorContext<Object, Object> context;

            @Override
            public void init(final ProcessorContext<Object, Object> context) {
                this.context = context;
            }

            @Override
            public void process(Record<Object, Object> record) {
                record.headers().add("key1", 
record.value().toString().getBytes());
                context.forward(record);
            }
        })

        .groupByKey()
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), 
Duration.ofDays(1L)))
        .aggregate(() -> 1,
                (key, value, aggregate) -> aggregate,
                (aggKey, aggOne, aggTwo) -> aggTwo)
        .toStream()
        .map((key, value) -> new KeyValue<>(key.key(), value))
        .to(outputTopic); {code}
Checking evens in the `outputTopic` show that the headers are empty. With 3.2.* 
the same topology would have propagated the headers.

 

I can see here: 
[https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205]
 that now a new record is created ignoring the headers, while in 3.2.2, the 
same record was forwarded after changing the key and value while the keeping 
the headers: 
[https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196]

 

  was:
Hey,

While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow 
aggregate behaviour, it seems now that custom headers added before the 
aggregate are dropped.

I could reproduce the behaviour with the following test topology:
{code:java}
// code placeholder
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(EARLIEST))
        .process(() -> new Processor<Object, Object, Object, Object>() {
            private ProcessorContext<Object, Object> context;

            @Override
            public void init(final ProcessorContext<Object, Object> context) {
                this.context = context;
            }

            @Override
            public void process(Record<Object, Object> record) {
                record.headers().add("key1", 
record.value().toString().getBytes());
                context.forward(record);
            }
        })

        .groupByKey()
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), 
Duration.ofDays(1L)))
        .aggregate(() -> 1,
                (key, value, aggregate) -> aggregate,
                (aggKey, aggOne, aggTwo) -> aggTwo)
        .toStream()
        .map((key, value) -> new KeyValue<>(key.key(), value))
        .to(outputTopic); {code}
Checking evens in the `outputTopic` show that the headers are empty. With 3.2.* 
the same topology would have propagated the headers.


> Session window aggregate drops records headers
> ----------------------------------------------
>
>                 Key: KAFKA-15595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15595
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.5.1
>            Reporter: Abdullah alkhawatrah
>            Priority: Major
>
> Hey,
> While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow 
> aggregate behaviour, it seems now that custom headers added before the 
> aggregate are dropped.
> I could reproduce the behaviour with the following test topology:
> {code:java}
> // code placeholder
> final StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(EARLIEST))
>         .process(() -> new Processor<Object, Object, Object, Object>() {
>             private ProcessorContext<Object, Object> context;
>             @Override
>             public void init(final ProcessorContext<Object, Object> context) {
>                 this.context = context;
>             }
>             @Override
>             public void process(Record<Object, Object> record) {
>                 record.headers().add("key1", 
> record.value().toString().getBytes());
>                 context.forward(record);
>             }
>         })
>         .groupByKey()
>         
> .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), 
> Duration.ofDays(1L)))
>         .aggregate(() -> 1,
>                 (key, value, aggregate) -> aggregate,
>                 (aggKey, aggOne, aggTwo) -> aggTwo)
>         .toStream()
>         .map((key, value) -> new KeyValue<>(key.key(), value))
>         .to(outputTopic); {code}
> Checking evens in the `outputTopic` show that the headers are empty. With 
> 3.2.* the same topology would have propagated the headers.
>  
> I can see here: 
> [https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205]
>  that now a new record is created ignoring the headers, while in 3.2.2, the 
> same record was forwarded after changing the key and value while the keeping 
> the headers: 
> [https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to