[ 
https://issues.apache.org/jira/browse/KAFKA-9972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9972.
------------------------------------
    Fix Version/s: 2.6.0
       Resolution: Fixed

> Corrupted standby task could be committed
> -----------------------------------------
>
>                 Key: KAFKA-9972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9972
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>             Fix For: 2.6.0
>
>
> A corrupted standby task could revive and transit to the CREATED state, which 
> will then trigger by `taskManager.commitAll` in next runOnce, causing an 
> illegal state:
> ```
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,646] WARN 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException 
> fetching records from restore consumer for partitions 
> [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1], it 
> is likely that the consumer's position has fallen out of the topic partition 
> offset range because the topic was truncated or compacted on the broker, 
> marking the corresponding tasks as corrupted and re-initializing it later. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,646] WARN 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Detected the states of tasks 
> \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]}
>  are corrupted. Will close the task as dirty and re-create and bootstrap from 
> scratch. (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
> \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]}
>  are corrupted and hence needs to be re-initialized
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,652] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> [Consumer 
> clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer,
>  groupId=null] Unsubscribed all topics or patterns and assigned partitions 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,652] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> standby-task [1_1] Prepared dirty close 
> (org.apache.kafka.streams.processor.internals.StandbyTask)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,679] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> standby-task [1_1] Closed dirty 
> (org.apache.kafka.streams.processor.internals.StandbyTask)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,751] ERROR 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
> java.lang.IllegalStateException: Illegal state CREATED while preparing 
> standby task 1_1 for committing
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,751] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> ```
> Two solutions here: either we deprecate `commitAll` and always enforce state 
> check to selectively commit tasks, or we enforce a state check inside standby 
> task commitNeeded call to reference its state. Added a fix for option one 
> here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to