[ https://issues.apache.org/jira/browse/KAFKA-9972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102926#comment-17102926 ]
Sophie Blee-Goldman commented on KAFKA-9972: -------------------------------------------- I see that we already committed a fix for this, but I'm a little concerned that we're only covering this one specific use case rather than the general problem of illegally attempting to commit a task in CREATED. AFAIK the corrupted standby case is just the one we happened to catch in soak, not the only possible case. WDYT? cc [~guozhang] [~vvcephei] – see comment on https://github.com/apache/kafka/commit/7907b5a6e921cec3b5fad2a4f84a78a851140755#r39046991 > Corrupted standby task could be committed > ----------------------------------------- > > Key: KAFKA-9972 > URL: https://issues.apache.org/jira/browse/KAFKA-9972 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > > A corrupted standby task could revive and transit to the CREATED state, which > will then trigger by `taskManager.commitAll` in next runOnce, causing an > illegal state: > ``` > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,646] WARN > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException > fetching records from restore consumer for partitions > [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1], it > is likely that the consumer's position has fallen out of the topic partition > offset range because the topic was truncated or compacted on the broker, > marking the corresponding tasks as corrupted and re-initializing it later. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,646] WARN > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > Detected the states of tasks > \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]} > are corrupted. Will close the task as dirty and re-create and bootstrap from > scratch. (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs > \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]} > are corrupted and hence needs to be re-initialized > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,652] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > [Consumer > clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > (org.apache.kafka.clients.consumer.KafkaConsumer) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,652] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > standby-task [1_1] Prepared dirty close > (org.apache.kafka.streams.processor.internals.StandbyTask) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,679] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > standby-task [1_1] Closed dirty > (org.apache.kafka.streams.processor.internals.StandbyTask) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,751] ERROR > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) > java.lang.IllegalStateException: Illegal state CREATED while preparing > standby task 1_1 for committing > at > org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) > [2020-05-07T20:57:23-07:00] > (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 > 03:57:22,751] INFO > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] > stream-thread > [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > ``` > Two solutions here: either we deprecate `commitAll` and always enforce state > check to selectively commit tasks, or we enforce a state check inside standby > task commitNeeded call to reference its state. Added a fix for option one > here. -- This message was sent by Atlassian Jira (v8.3.4#803005)