[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121149#comment-17121149
 ] 

Boyang Chen commented on KAFKA-9891:
------------------------------------

[~mateuszjadczyk] If we commit keyOne, then it *should* be materialized inside 
the standby state store copy, and the duplicate key check will see it, correct? 
Will sync with [~guozhang] on whether to backport this checkpoint lock to 2.4.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9891
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9891
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Mateusz Jadczyk
>            Assignee: Boyang Chen
>            Priority: Blocker
>         Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15 
> 21:11:33.942 DEBUG 1 --- [-StreamThread-2] 
> c.g.f.c.s.validation.CommandIdValidator : CommandId: 
> mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3 
> 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.StoreChangelogReader : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found 
> checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for 
> store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- 
> [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: 
> mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
> NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.AssignedStreamsTasks : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to 
> process stream task 1_2 due to the following error: 
> java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 --- 
> [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.StoreChangelogReader : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found 
> checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for 
> store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 --- 
> [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate 
> detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
> {code}
> It seems that on NODE_3 there's a standby task 1_2 running on T-2, it 
> replicates a first valid command, thus creating a checkpoint file. Invalid 
> command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It 
> finds the checkpoint file (which is fine), and starts to process the invalid 
> command. It crashes, same node T-1 takes over, finds the checkpoint file (!), 
> thinks state store is clean (apparently it's not as it contains state 
> modified by T-2) and finds a duplicated command id.
>  
> We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for 
> a moment to kafka clients 2.3.1 and the problem persists.
> *We performed more tests with configuration changes and after changing 
> `num.standby.replicas = 1` to `num.standby.replicas = 0` the problem 
> disappeared. It is also resolved when changing the store to 
> _inMemoryWindowStore._*
> In the SO question you can find the relevant java code. I don't have a sample 
> project to share at the moment which replicates the problem, but it is easily 
> repeatable in our project.
> Such behaviour can have serious implications on business logic, in our case 
> accidentally skipped messages without properly processing them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to