[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Sheppard updated KAFKA-13289:
-------------------------------------
    Description: 
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.`

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is available at 
https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

The actual kafka-streams topology in there looks like this.


{code:java}
            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> leftStream = 
builder.stream(leftTopic);
            final KStream<String, String> rightStream = 
builder.stream(rightTopic);

            final KStream<String, String> rekeyedLeftStream = leftStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            final KStream<String, String> rekeyedRightStream = rightStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

            final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
                    rekeyedRightStream,
                    (left, right) -> left + "/" + right,
                    joinWindow
            );
{code}


...and the eventual output I produce looks like this...

{code}
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
{code}

...where as, given the input data, I expect to see every row end with the two 
values joined, rather than the right value being null.

Note that I understand it's expected that we initially get the left/null values 
for many values since that's the expected semantics of kafka-streams left join, 
at least until 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious

I've noticed that if I set a very large grace value on the join window the 
problem is solved, but since the input I provide is not out of order I did not 
expect to need to do that, and I'm weary of the resource requirements doing so 
in practice on an application with a lot of volume.

My suspicion is that something is happening such that when one partition is 
processed it causes the stream time to be pushed forward to the newest message 
in that partition, meaning when the next partition is then examined it is found 
to contain many records which are 'too old' compared to the stream time. 

I ran across 
https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
 from a year and a half ago which seems to describe the same problem, but I'm 
hoping the self-contained reproduction might make the issue easier to tackle!

  was:
When pushing bulk data through a kafka-steams app, I see it log the following 
message many times...

`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment.`

...and data which I expect to have been joined through a leftJoin step appears 
to be lost.

I've seen this in practice either when my application has been shut down for a 
while and then is brought back up, or when I've used something like the 
[app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
 in an attempt to have the application reprocess past data.

I was able to reproduce this behaviour in isolation by generating 1000 messages 
to two topics spaced an hour apart (with the original timestamps in order), 
then having kafka streams select a key for them and try to leftJoin the two 
rekeyed streams.

Self contained source code for that reproduction is available at 
https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java

The actual kafka-streams topology in there looks like this.

```
            final StreamsBuilder builder = new StreamsBuilder();
            final KStream<String, String> leftStream = 
builder.stream(leftTopic);
            final KStream<String, String> rightStream = 
builder.stream(rightTopic);

            final KStream<String, String> rekeyedLeftStream = leftStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            final KStream<String, String> rekeyedRightStream = rightStream
                    .selectKey((k, v) -> v.substring(0, v.indexOf(":")));

            JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

            final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
                    rekeyedRightStream,
                    (left, right) -> left + "/" + right,
                    joinWindow
            );
```

...and the eventual output I produce looks like this...

```
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
```

...where as, given the input data, I expect to see every row end with the two 
values joined, rather than the right value being null.

Note that I understand it's expected that we initially get the left/null values 
for many values since that's the expected semantics of kafka-streams left join, 
at least until 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious

I've noticed that if I set a very large grace value on the join window the 
problem is solved, but since the input I provide is not out of order I did not 
expect to need to do that, and I'm weary of the resource requirements doing so 
in practice on an application with a lot of volume.

My suspicion is that something is happening such that when one partition is 
processed it causes the stream time to be pushed forward to the newest message 
in that partition, meaning when the next partition is then examined it is found 
to contain many records which are 'too old' compared to the stream time. 

I ran across 
https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
 from a year and a half ago which seems to describe the same problem, but I'm 
hoping the self-contained reproduction might make the issue easier to tackle!


> Bulk processing data through a join with kafka-streams results in `Skipping 
> record for expired segment`
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13289
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13289
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Matthew Sheppard
>            Priority: Major
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> `WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.`
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
>             final StreamsBuilder builder = new StreamsBuilder();
>             final KStream<String, String> leftStream = 
> builder.stream(leftTopic);
>             final KStream<String, String> rightStream = 
> builder.stream(rightTopic);
>             final KStream<String, String> rekeyedLeftStream = leftStream
>                     .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
>             final KStream<String, String> rekeyedRightStream = rightStream
>                     .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
>             JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
>             final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
>                     rekeyedRightStream,
>                     (left, right) -> left + "/" + right,
>                     joinWindow
>             );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across 
> https://kafkacommunity.blogspot.com/2020/02/re-skipping-record-for-expired-segment_88.html
>  from a year and a half ago which seems to describe the same problem, but I'm 
> hoping the self-contained reproduction might make the issue easier to tackle!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to