This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 0f0093c  KAFKA-8620: fix NPE due to race condition during shutdown 
while rebalancing (#7021)
0f0093c is described below

commit 0f0093c1693d6ea4091228cd3a87db0219d028a2
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Mon Jul 15 15:02:13 2019 -0700

    KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing 
(#7021)
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Bruno Cadonna 
<br...@confluent.io>, John Roesler <j...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  | 34 +++++++--
 .../processor/internals/StreamThreadTest.java      | 82 ++++++++++++++++++++++
 2 files changed, 109 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 419e181..4b157ab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -194,14 +194,20 @@ public class StreamThread extends Thread {
             oldState = state;
 
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
+                log.debug("Ignoring request to transit from PENDING_SHUTDOWN 
to {}: " +
+                              "only DEAD state is a valid next state", 
newState);
                 // when the state is already in PENDING_SHUTDOWN, all other 
transitions will be
                 // refused but we do not throw exception here
                 return null;
             } else if (state == State.DEAD) {
+                log.debug("Ignoring request to transit from DEAD to {}: " +
+                              "no valid next state after DEAD", newState);
                 // when the state is already in NOT_RUNNING, all its 
transitions
                 // will be refused but we do not throw exception here
                 return null;
             } else if (state == State.PARTITIONS_REVOKED && newState == 
State.PARTITIONS_REVOKED) {
+                log.debug("Ignoring request to transit from PARTITIONS_REVOKED 
to PARTITIONS_REVOKED: " +
+                              "self transition is not allowed");
                 // when the state is already in PARTITIONS_REVOKED, its 
transition to itself will be
                 // refused but we do not throw exception here
                 return null;
@@ -272,17 +278,23 @@ public class StreamThread extends Thread {
             final long start = time.milliseconds();
             try {
                 if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
-                    return;
-                }
-                if (streamThread.assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.NONE.code()) {
+                    log.debug(
+                        "Skipping task creation in rebalance because we are 
already in {} state.",
+                        streamThread.state()
+                    );
+                } else if (streamThread.assignmentErrorCode.get() != 
StreamsPartitionAssignor.Error.NONE.code()) {
+                    log.debug(
+                        "Encountered assignment error during partition 
assignment: {}. Skipping task initialization",
+                        streamThread.assignmentErrorCode
+                    );
+                } else {
+                    log.debug("Creating tasks based on assignment.");
                     taskManager.createTasks(assignment);
                 }
             } catch (final Throwable t) {
                 log.error(
                     "Error caught during partition assignment, " +
-                        "will abort the current process and re-throw at the 
end of rebalance: {}",
-                    t
-                );
+                        "will abort the current process and re-throw at the 
end of rebalance", t);
                 streamThread.setRebalanceException(t);
             } finally {
                 log.info("partition assignment took {} ms.\n" +
@@ -833,7 +845,6 @@ public class StreamThread extends Thread {
     // Visible for testing
     void runOnce() {
         final ConsumerRecords<byte[], byte[]> records;
-
         now = time.milliseconds();
 
         if (state == State.PARTITIONS_ASSIGNED) {
@@ -854,6 +865,15 @@ public class StreamThread extends Thread {
             throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
         }
 
+        // Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
+        // The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().
+        // Should only proceed when the thread is still running after 
#pollRequests(), because no external state mutation
+        // could affect the task manager state beyond this point within 
#runOnce().
+        if (!isRunning()) {
+            log.debug("State already transits to {}, skipping the run once 
call after poll request", state);
+            return;
+        }
+
         final long pollLatency = advanceNowAndComputeLatency();
 
         if (records != null && !records.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f99bb89..7a4e33b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -20,8 +20,10 @@ import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -73,6 +75,7 @@ import org.junit.Test;
 import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -665,6 +668,85 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldNotThrowWhenPendingShutdownInRunOnce() {
+        mockRunOnce(true);
+    }
+
+    @Test
+    public void shouldNotThrowWithoutPendingShutdownInRunOnce() {
+        // A reference test to verify that without intermediate shutdown the 
runOnce should pass
+        // without any exception.
+        mockRunOnce(false);
+    }
+
+    private void mockRunOnce(final boolean shutdownOnPoll) {
+        final Collection<TopicPartition> assignedPartitions = 
Collections.singletonList(t1p1);
+        class MockStreamThreadConsumer<K, V> extends MockConsumer<K, V> {
+
+            private StreamThread streamThread;
+
+            private MockStreamThreadConsumer(final OffsetResetStrategy 
offsetResetStrategy) {
+                super(offsetResetStrategy);
+            }
+
+            @Override
+            public synchronized ConsumerRecords<K, V> poll(final Duration 
timeout) {
+                assertNotNull(streamThread);
+                if (shutdownOnPoll) {
+                    streamThread.shutdown();
+                }
+                
streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+                return super.poll(timeout);
+            }
+
+            private void setStreamThread(final StreamThread streamThread) {
+                this.streamThread = streamThread;
+            }
+        }
+
+        final MockStreamThreadConsumer<byte[], byte[]> 
mockStreamThreadConsumer =
+            new MockStreamThreadConsumer<>(OffsetResetStrategy.EARLIEST);
+
+        final TaskManager taskManager = new TaskManager(new 
MockChangelogReader(),
+                                                        processId,
+                                                        "log-prefix",
+                                                        
mockStreamThreadConsumer,
+                                                        streamsMetadataState,
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        new 
AssignedStreamsTasks(new LogContext()),
+                                                        new 
AssignedStandbyTasks(new LogContext()));
+        taskManager.setConsumer(mockStreamThreadConsumer);
+        taskManager.setAssignmentMetadata(Collections.emptyMap(), 
Collections.emptyMap());
+
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new 
StreamThread.StreamsMetricsThreadImpl(metrics, clientId);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            mockStreamThreadConsumer,
+            mockStreamThreadConsumer,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            clientId,
+            new LogContext(""),
+            new AtomicInteger()
+        ).updateThreadMetadata(getSharedAdminClientId(clientId));
+
+        mockStreamThreadConsumer.setStreamThread(thread);
+        mockStreamThreadConsumer.assign(assignedPartitions);
+        
mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+
+        addRecord(mockStreamThreadConsumer, 1L, 0L);
+        thread.setState(StreamThread.State.STARTING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+        thread.runOnce();
+    }
+
+    @Test
     public void shouldOnlyShutdownOnce() {
         final Consumer<byte[], byte[]> consumer = 
EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);

Reply via email to