[ 
https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17319951#comment-17319951
 ] 

Hans-Peter Grahsl commented on KAFKA-12608:
-------------------------------------------

Hi, I didn't have the time yet to investigate much deeper but I have the 
following *unverified* *assumption* what might go wrong here with [~jamii]'s 
example:

1) From what I could spot in the code of the given demo scenario the KStreams 
app uses a custom TS extractor.

2) The original input data (transactions topic) that is processed by KStreams 
contains a payload TS which is considerably older than the default topic 
retention time. KStreams will extract this payload TS and use it for producing 
records back into the target topic (accepted_transactions), and I think herein 
lies the problem.

3) This might lead to the described intermittent "phenomenon" that for some 
runs, consuming resulting data from the KStreams target topic 
(accepted_transactions) doesn't show all records anymore, because meanwhile, 
the topic clean up routine has wiped out a bunch of records in the background 
due to respecting the retention policy.

4) That said, I would assume that changing either of the following will lead to 
a properly working example:
a) removing the custom TS extractor for your KStreams code
b) use a payload TS for you demo data which is clearly within the retention 
policy
c) extend the topic's retention time so that this will be long enough to not 
trigger time-based retention in your target topic

Again, I might be wrong because I haven't had enough time yet to look into this 
any deeper. If I'm wrong, somebody else like [~mjsax] needs to go down the 
rabbit hole ;)

 

> Simple identity pipeline sometimes loses data
> ---------------------------------------------
>
>                 Key: KAFKA-12608
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12608
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0
>         Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos 
>            Reporter: Jamie Brandon
>            Priority: Major
>
> I'm running a very simple streams program that reads records from one topic 
> into a table and then writes the stream back into another topic. In about 1 
> in 5 runs, some of the output records are missing. They tend to form a single 
> contiguous range, as if a single batch was dropped somewhere.
> https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52
> {code:bash}
> $ wc -l tmp/*transactions
>  999514 tmp/accepted_transactions
>  1000000 tmp/transactions
>  1999514 total
> $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in
> $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out
> $ diff in out | wc -l
>  487
> $ diff in out | head
>  25313,25798d25312
>  < 25312
>  < 25313
>  < 25314
>  < 25315
>  < 25316
>  < 25317
>  < 25318
>  < 25319
>  < 25320
>  
> $ diff in out | tail
>  < 25788
>  < 25789
>  < 25790
>  < 25791
>  < 25792
>  < 25793
>  < 25794
>  < 25795
>  < 25796
>  < 25797
> {code}
> I've checked running the consumer multiple times to make sure that the 
> records are actually missing from the topic and it wasn't just a hiccup in 
> the consumer. 
> The repo linked above has instructions in the readme on how to reproduce the 
> exact versions used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to