[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17431059#comment-17431059
 ] 

Eugen Dück edited comment on KAFKA-13289 at 10/21/21, 12:15 AM:
----------------------------------------------------------------

[~msheppard] You are right - my bad!

I have changed the number of messages back to 1000, and this is what I am 
getting (still using my non-re-keying version):

{{ 0 [0:left/null, 0:left/0:right]}}
 {{ 1 [1:left/null, 1:left/1:right]}}
 {{ 2 [2:left/null, 2:left/2:right]}}
 {{ 3 [3:left/null, 3:left/3:right]}}
 {{ 4 [4:left/null, 4:left/4:right]}}
 {{ 5 [5:left/null, 5:left/5:right]}}
 {{ 6 [6:left/null, 6:left/6:right]}}
 {{...}}
 {{688 [688:left/null, 688:left/688:right]}}
 {{689 [689:left/null, 689:left/689:right]}}
 {{691 [691:left/null, 691:left/691:right]}}
 {{692 [692:left/null, 692:left/692:right]}}
 {{694 [694:left/null, 694:left/694:right]}}
 {{695 [695:left/null, 695:left/695:right]}}
 {{696 [696:left/null, 696:left/696:right]}}
 {{697 [697:left/null, 697:left/697:right]}}
 {{701 [701:left/null, 701:left/701:right]}}

So it starts with consecutive numbers, but later, there are gaps and then it 
stops at 701. I ran this same test three times, with exactly the same gaps (as 
far as I checked and pasted to this comment) and the same last number 701, so 
the outcome seems to be deterministic. I am assuming the expected behavior 
would be to have all numbers down to 999, so this would not be expected 
behavior.


was (Author: eugendueck):
[~msheppard] You are right - my bad!

I have changed the number of messages back to 1000, and this is what I am 
getting (still using my non-re-keying version):

{{ 0 [0:left/null, 0:left/0:right]}}
{{ 1 [1:left/null, 1:left/1:right]}}
{{ 2 [2:left/null, 2:left/2:right]}}
{{ 3 [3:left/null, 3:left/3:right]}}
{{ 4 [4:left/null, 4:left/4:right]}}
{{ 5 [5:left/null, 5:left/5:right]}}
{{ 6 [6:left/null, 6:left/6:right]}}
{{...}}
{{688 [688:left/null, 688:left/688:right]}}
{{689 [689:left/null, 689:left/689:right]}}
{{691 [691:left/null, 691:left/691:right]}}
{{692 [692:left/null, 692:left/692:right]}}
{{694 [694:left/null, 694:left/694:right]}}
{{695 [695:left/null, 695:left/695:right]}}
{{696 [696:left/null, 696:left/696:right]}}
{{697 [697:left/null, 697:left/697:right]}}
{{701 [701:left/null, 701:left/701:right]}}

So it starts with consecutive numbers, but later, there are gaps and then it 
stops at 701. I ran this same test twice, with exactly the same gaps (as far as 
I checked and pasted to this comment) and the same last number 701. I am 
assuming the expected behavior would be to have all numbers down to 999, so 
this would not be expected behavior.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13289
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13289
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Matthew Sheppard
>            Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
>             final StreamsBuilder builder = new StreamsBuilder();
>             final KStream<String, String> leftStream = 
> builder.stream(leftTopic);
>             final KStream<String, String> rightStream = 
> builder.stream(rightTopic);
>             final KStream<String, String> rekeyedLeftStream = leftStream
>                     .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
>             final KStream<String, String> rekeyedRightStream = rightStream
>                     .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
>             JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
>             final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
>                     rekeyedRightStream,
>                     (left, right) -> left + "/" + right,
>                     joinWindow
>             );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this discussion thread which seems to cover the same issue 
> http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e
>  and had a request from [~cadonna] for a reproduction case, so I'm hoping my 
> example above might make the issue easier to tackle!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to