[ 
https://issues.apache.org/jira/browse/KAFKA-9972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104732#comment-17104732
 ] 

Sophie Blee-Goldman commented on KAFKA-9972:
--------------------------------------------

Fair enough. Then would you propose that the contract should be: Task 
implementation validates and enforces the state for any multi-phase API (eg 
preCommit + postCommit) but accepts and internally handles any fully 
encapsulated API (eg initializeIfNeeded). I don't think we need to drop 
everything to get all the existing APIs in line, but it would be nice to have 
something informing future changes in the TaskManager-Task interactions.

Anyways, in that case then I'll amend my proposal for this specific topic and 
instead suggest we go through the same TaskManager code path for all 
committing. We're still vulnerable to calling Task#commit on a CREATED task in 
#handleAssignment at least. 

I'd also prefer to do the task filtration in TaskManager rather than in 
StreamThread. If we can't keep this logic encapsulated inside Task, we should 
at least aim to keep it encapsulated inside TaskManager. Compare with 
TaskManager#tryToCompleteRestoration for example, which filters out the 
non-RESTORING tasks inside the TaskManager (of course, whether we have to do 
this filtering at all for Task#completeRestoration is a question for the above 
contract..)

> Corrupted standby task could be committed
> -----------------------------------------
>
>                 Key: KAFKA-9972
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9972
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> A corrupted standby task could revive and transit to the CREATED state, which 
> will then trigger by `taskManager.commitAll` in next runOnce, causing an 
> illegal state:
> ```
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,646] WARN 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException 
> fetching records from restore consumer for partitions 
> [stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1], it 
> is likely that the consumer's position has fallen out of the topic partition 
> offset range because the topic was truncated or compacted on the broker, 
> marking the corresponding tasks as corrupted and re-initializing it later. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,646] WARN 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Detected the states of tasks 
> \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]}
>  are corrupted. Will close the task as dirty and re-create and bootstrap from 
> scratch. (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
> \{1_1=[stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-1]}
>  are corrupted and hence needs to be re-initialized
>         at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:428)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:680)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,652] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> [Consumer 
> clientId=stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1-restore-consumer,
>  groupId=null] Unsubscribed all topics or patterns and assigned partitions 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,652] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> standby-task [1_1] Prepared dirty close 
> (org.apache.kafka.streams.processor.internals.StandbyTask)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,679] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> standby-task [1_1] Closed dirty 
> (org.apache.kafka.streams.processor.internals.StandbyTask)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,751] ERROR 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) 
> java.lang.IllegalStateException: Illegal state CREATED while preparing 
> standby task 1_1 for committing
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.prepareCommit(StandbyTask.java:134)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:752)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:741)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:863)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:725)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:558)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> [2020-05-07T20:57:23-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-0f819f0a58017b05b_streamslog) [2020-05-08 
> 03:57:22,751] INFO 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] 
> stream-thread 
> [stream-soak-test-5ab3951c-2ca8-40a8-9096-0957e70a21b7-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> ```
> Two solutions here: either we deprecate `commitAll` and always enforce state 
> check to selectively commit tasks, or we enforce a state check inside standby 
> task commitNeeded call to reference its state. Added a fix for option one 
> here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to