[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109604#comment-17109604
 ] 

Guozhang Wang commented on KAFKA-10010:
---------------------------------------

We intentionally did not close standby tasks because they are likely to be 
reassigned back. Here I feel the issue is that when we get an exception in 
registerStateStores, in order to keep the standby tasks in CREATED state we 
need to make sure any partially registered stores are de-registered in 
`closeStateManager`, thoughts?

> Should close standby task for safety during HandleLostAll
> ---------------------------------------------------------
>
>                 Key: KAFKA-10010
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10010
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-0000000007 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to