Guozhang Wang created KAFKA-7112:
------------------------------------

             Summary: StreamThread does not check for state again after 
pollRequests()
                 Key: KAFKA-7112
                 URL: https://issues.apache.org/jira/browse/KAFKA-7112
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Guozhang Wang


In StreamThread's main loop, we have:

{code}
        if (state == State.PARTITIONS_ASSIGNED) {
            // try to fetch some records with zero poll millis
            // to unblock the restoration as soon as possible
            records = pollRequests(Duration.ZERO);

            if (taskManager.updateNewAndRestoringTasks()) {
                setState(State.RUNNING);
            }
        }
{code}

in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then 
call `consumer.poll()` and then call `askManager.updateNewAndRestoringTasks()`. 
There is a race condition though, that if another rebalance gets triggered, 
then `onPartitionRevoked` will be called in which we will 
{{restoreConsumer.unsubscribe();}}, and then if we call 
{{taskManager.updateNewAndRestoringTasks()}} right away we will see this:

{code}
Encountered the following error during processing: 
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or 
assigned any partitions
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to