Guozhang Wang created KAFKA-7112:
------------------------------------
Summary: StreamThread does not check for state again after
pollRequests()
Key: KAFKA-7112
URL: https://issues.apache.org/jira/browse/KAFKA-7112
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Guozhang Wang
In StreamThread's main loop, we have:
{code}
if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
}
{code}
in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then
call `consumer.poll()` and then call `askManager.updateNewAndRestoringTasks()`.
There is a race condition though, that if another rebalance gets triggered,
then `onPartitionRevoked` will be called in which we will
{{restoreConsumer.unsubscribe();}}, and then if we call
{{taskManager.updateNewAndRestoringTasks()}} right away we will see this:
{code}
Encountered the following error during processing:
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or
assigned any partitions
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)