[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092295#comment-17092295 ]
Boyang Chen commented on KAFKA-9891: ------------------------------------ I made a try to replicate the scenario with integration test, but 2.4 didn't fail. Could you take a look and see if my implementation replicates what you suggested? > Invalid state store content after task migration with exactly_once and > standby replicas > --------------------------------------------------------------------------------------- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.1, 2.4.1 > Reporter: Mateusz Jadczyk > Assignee: Boyang Chen > Priority: Blocker > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15 > 21:11:33.942 DEBUG 1 --- [-StreamThread-2] > c.g.f.c.s.validation.CommandIdValidator : CommandId: > mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3 > 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp > /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1 > NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.StoreChangelogReader : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found > checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for > store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- > [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: > mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate. > NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] > o.a.k.s.p.i.AssignedStreamsTasks : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to > process stream task 1_2 due to the following error: > java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 --- > [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1 > NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.StoreChangelogReader : stream-thread > [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found > checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for > store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 --- > [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate > detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc > {code} > It seems that on NODE_3 there's a standby task 1_2 running on T-2, it > replicates a first valid command, thus creating a checkpoint file. Invalid > command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It > finds the checkpoint file (which is fine), and starts to process the invalid > command. It crashes, same node T-1 takes over, finds the checkpoint file (!), > thinks state store is clean (apparently it's not as it contains state > modified by T-2) and finds a duplicated command id. > > We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for > a moment to kafka clients 2.3.1 and the problem persists. > *We performed more tests with configuration changes and after changing > `num.standby.replicas = 1` to `num.standby.replicas = 0` the problem > disappeared. It is also resolved when changing the store to > _inMemoryWindowStore._* > In the SO question you can find the relevant java code. I don't have a sample > project to share at the moment which replicates the problem, but it is easily > repeatable in our project. > Such behaviour can have serious implications on business logic, in our case > accidentally skipped messages without properly processing them. -- This message was sent by Atlassian Jira (v8.3.4#803005)