[ https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-16017: ------------------------------------ Fix Version/s: 3.5.3 3.4.2 3.6.2 > Checkpointed offset is incorrect when task is revived and restoring > -------------------------------------------------------------------- > > Key: KAFKA-16017 > URL: https://issues.apache.org/jira/browse/KAFKA-16017 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.3.1 > Reporter: Bruno Cadonna > Assignee: Bruno Cadonna > Priority: Major > Fix For: 3.4.2, 3.7.0, 3.6.2, 3.5.3 > > > Streams checkpoints the wrong offset when a task is revived after a > {{TaskCorruptedException}} and the task is then migrated to another stream > thread during restoration. > This might happen in a situation like the following if the Streams > application runs under EOS: > 1. Streams encounters a Network error which triggers a > {{TaskCorruptedException}} > 2. The task that encountered the exception is closed dirty and revived. The > state store directory is wiped out and a rebalance is triggered. > 3. Until the sync of the rebalance is received the revived task is restoring. > 4. When the sync is received the revived task is revoked and a new rebalance > is triggered. During the revocation the task is closed cleanly and a > checkpoint file is written. > 5. With the next rebalance the task moves back to stream thread from which it > was revoked, read the checkpoint and starts restoring. (I might be enough if > the task moves to a stream thread on the same Streams client that shares the > same state directory). > 6. The state of the task misses some records > To mitigate the issue one can restart the the stream thread and delete of the > state on disk. After that the state restores completely from the changelog > topic and the state does not miss any records anymore. > The root cause is that the checkpoint that is written in step 4 contains the > offset that the record collector stored when it sent the records to the > changelog topic. However, since in step 2 the state directory is wiped out, > the state does not contain those records anymore. It only contains the > records that it restored in step 3 which might be less. The root cause of > this is that the offsets in the record collector are not cleaned up when the > record collector is closed. > I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017. > The repro can be started with > {code} > ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x > spotbugsTest --tests RestoreIntegrationTest.test --info > test.log > {code} > The repro writes records into a state store and tries to retrieve them again > (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582). > It will throw an {{IllegalStateException}} if it cannot find a record in the > state > (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594). > Once the offsets in the record collector are cleared on close > (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332 > and > https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349), > the {{IllegalStateException}} does not occur anymore. > In the logs you can check for > - {{Restore batch end offset is}} which are the restored offsets in the state. > - {{task [0_1] Writing checkpoint:}} which are the written checkpoints. > - {{task [0_1] Checkpointable offsets}} which show the offsets coming from > the sending records to the changelog topic > {{RestoreIntegrationTesttest-stateStore-changelog-1}} > Always the last instances of these before the {{IllegalStateException}} is > thrown. > You will see that the restored offsets are less than the offsets that are > written to the checkpoint. The offsets written to the checkpoint come from > the offsets stored when sending the records to the changelog topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)