[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110638#comment-17110638
 ] 

Boyang Chen commented on KAFKA-10010:
-------------------------------------

For more context, the 
[reason|[https://github.com/apache/kafka/pull/8440/files#r407722022]] we have 
to keep the txn commit before handle task corruption, since otherwise under EOS 
beta the stream thread could actually abort other healthy tasks.

> Should close standby task for safety during HandleLostAll
> ---------------------------------------------------------
>
>                 Key: KAFKA-10010
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10010
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-0000000007 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to