[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100020#comment-17100020
 ] 

Boyang Chen commented on KAFKA-9891:
------------------------------------

We had some offline discussion, and plan to revise the integration test PR as 
well. We want to test out on trunk first, and then backport to 2.4 to ensure 
the regression doesn't carry over.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9891
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9891
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Mateusz Jadczyk
>            Assignee: Boyang Chen
>            Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15 
> 21:11:33.942 DEBUG 1 --- [-StreamThread-2] 
> c.g.f.c.s.validation.CommandIdValidator : CommandId: 
> mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3 
> 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.StoreChangelogReader : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found 
> checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for 
> store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- 
> [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: 
> mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
> NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.AssignedStreamsTasks : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to 
> process stream task 1_2 due to the following error: 
> java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 --- 
> [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.StoreChangelogReader : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found 
> checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for 
> store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 --- 
> [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate 
> detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
> {code}
> It seems that on NODE_3 there's a standby task 1_2 running on T-2, it 
> replicates a first valid command, thus creating a checkpoint file. Invalid 
> command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It 
> finds the checkpoint file (which is fine), and starts to process the invalid 
> command. It crashes, same node T-1 takes over, finds the checkpoint file (!), 
> thinks state store is clean (apparently it's not as it contains state 
> modified by T-2) and finds a duplicated command id.
>  
> We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for 
> a moment to kafka clients 2.3.1 and the problem persists.
> *We performed more tests with configuration changes and after changing 
> `num.standby.replicas = 1` to `num.standby.replicas = 0` the problem 
> disappeared. It is also resolved when changing the store to 
> _inMemoryWindowStore._*
> In the SO question you can find the relevant java code. I don't have a sample 
> project to share at the moment which replicates the problem, but it is easily 
> repeatable in our project.
> Such behaviour can have serious implications on business logic, in our case 
> accidentally skipped messages without properly processing them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to