[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430275#comment-17430275 ]
Eugen Dück edited comment on KAFKA-13289 at 10/19/21, 6:33 AM: --------------------------------------------------------------- We are running into similar issues (6.0.1-ccs for broker and kafka-streams library, i.e. kafka 2.6.1) * lots of "Skipping record for expired segment." warnings in AbstractRocksDBSegmentedBytesStore * at some point, our topology stops outputting data As we don't have any re-partitioning in our pipeline, I tried to remove the re-keying part from Matthew's code, and as far as I can tell, the problem still persists, so it would look like it is not related to re-partitioning. Btw. the problem shows even when doing just 10 instead of 1000 messages per topic. Find my fork of Matthew's code here: [https://github.com/EugenDueck/ins14809] This is the output of one such test run: {{[INFO] -------------------------------------------------------}} {{[INFO] T E S T S}} {{[INFO] -------------------------------------------------------}} {{[INFO] Running ins14809.Ins14809Test}} {{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 2:left, 8:left]}} {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 9:right, 8:right, 6:right]}} {{# Actual results}} {{We want to see every number X below end with an entry that says [X,left/X,right]}} {{but in practice we often see only [X,left/null] meaning the data was not joined.}} {{This seems to coincide with kafka streams writing...}} {{`WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment`}} {{...to its logs, in spite of the fact that the source message timestamps were in order when}} {{kafka streams got them.}} {{{{0 [0:left/null, 0:left/0:right]}}}} {{ 1 [1:left/1:right]}} {{2 [2:left/2:right]}} {{3 [3:left/null, 3:left/3:right]}} {{4 [4:left/null, 4:left/4:right]}} {{5 [5:left/5:right]}} {{6 [6:left/null, 6:left/6:right]}} {{7 [7:left/7:right]}} {{8 [8:left/8:right]}} {{9 [9:left/9:right] }} {{{{[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 s - in ins14809.Ins14809Test}}}}{{ }} was (Author: eugendueck): We are running into similar issues (6.0.1-ccs for broker and kafka-streams library, i.e. kafka 2.6.1) * lots of "Skipping record for expired segment." warnings in AbstractRocksDBSegmentedBytesStore * at some point, our topology stops outputting data As we don't have any re-partitioning in our pipeline, I tried to remove the re-keying part from Matthew's code, and as far as I can tell, the problem still persists, so it would look like it is not related to re-partitioning. Btw. the problem shows even when doing just 10 instead of 1000 messages per topic. Find my fork of Matthew's code here: [https://github.com/EugenDueck/ins14809] This is the output of one such test run: {{[INFO] -------------------------------------------------------}} {{[INFO] T E S T S}} {{[INFO] -------------------------------------------------------}} {{[INFO] Running ins14809.Ins14809Test}} {{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 2:left, 8:left]}} {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 9:right, 8:right, 6:right]}} {{# Actual results}} {{We want to see every number X below end with an entry that says [X,left/X,right]}} {{but in practice we often see only [X,left/null] meaning the data was not joined.}} {{This seems to coincide with kafka streams writing...}} {{`WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment`}} {{...to its logs, in spite of the fact that the source message timestamps were in order when}} {{kafka streams got them.}} {{0 [0:left/null, 0:left/0:right]}} {{ 1 [1:left/1:right]}} {{ 2 [2:left/2:right]}} {{ 3 [3:left/null, 3:left/3:right]}} {{ 4 [4:left/null, 4:left/4:right]}} {{ 5 [5:left/5:right]}} {{ 6 [6:left/null, 6:left/6:right]}} {{ 7 [7:left/7:right]}} {{ 8 [8:left/8:right]}} {{ 9 [9:left/9:right]}} {{[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 s - in ins14809.Ins14809Test}} > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > ------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Reporter: Matthew Sheppard > Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<String, String> leftStream = > builder.stream(leftTopic); > final KStream<String, String> rightStream = > builder.stream(rightTopic); > final KStream<String, String> rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream<String, String> rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream<String, String> joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this discussion thread which seems to cover the same issue > http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e > and had a request from [~cadonna] for a reproduction case, so I'm hoping my > example above might make the issue easier to tackle! -- This message was sent by Atlassian Jira (v8.3.4#803005)