This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 0f0093c KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021) 0f0093c is described below commit 0f0093c1693d6ea4091228cd3a87db0219d028a2 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Mon Jul 15 15:02:13 2019 -0700 KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (#7021) Reviewers: Matthias J. Sax <matth...@confluent.io>, Bruno Cadonna <br...@confluent.io>, John Roesler <j...@confluent.io> --- .../streams/processor/internals/StreamThread.java | 34 +++++++-- .../processor/internals/StreamThreadTest.java | 82 ++++++++++++++++++++++ 2 files changed, 109 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 419e181..4b157ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -194,14 +194,20 @@ public class StreamThread extends Thread { oldState = state; if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) { + log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " + + "only DEAD state is a valid next state", newState); // when the state is already in PENDING_SHUTDOWN, all other transitions will be // refused but we do not throw exception here return null; } else if (state == State.DEAD) { + log.debug("Ignoring request to transit from DEAD to {}: " + + "no valid next state after DEAD", newState); // when the state is already in NOT_RUNNING, all its transitions // will be refused but we do not throw exception here return null; } else if (state == State.PARTITIONS_REVOKED && newState == State.PARTITIONS_REVOKED) { + log.debug("Ignoring request to transit from PARTITIONS_REVOKED to PARTITIONS_REVOKED: " + + "self transition is not allowed"); // when the state is already in PARTITIONS_REVOKED, its transition to itself will be // refused but we do not throw exception here return null; @@ -272,17 +278,23 @@ public class StreamThread extends Thread { final long start = time.milliseconds(); try { if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) { - return; - } - if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) { + log.debug( + "Skipping task creation in rebalance because we are already in {} state.", + streamThread.state() + ); + } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) { + log.debug( + "Encountered assignment error during partition assignment: {}. Skipping task initialization", + streamThread.assignmentErrorCode + ); + } else { + log.debug("Creating tasks based on assignment."); taskManager.createTasks(assignment); } } catch (final Throwable t) { log.error( "Error caught during partition assignment, " + - "will abort the current process and re-throw at the end of rebalance: {}", - t - ); + "will abort the current process and re-throw at the end of rebalance", t); streamThread.setRebalanceException(t); } finally { log.info("partition assignment took {} ms.\n" + @@ -833,7 +845,6 @@ public class StreamThread extends Thread { // Visible for testing void runOnce() { final ConsumerRecords<byte[], byte[]> records; - now = time.milliseconds(); if (state == State.PARTITIONS_ASSIGNED) { @@ -854,6 +865,15 @@ public class StreamThread extends Thread { throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); } + // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). + // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). + // Should only proceed when the thread is still running after #pollRequests(), because no external state mutation + // could affect the task manager state beyond this point within #runOnce(). + if (!isRunning()) { + log.debug("State already transits to {}, skipping the run once call after poll request", state); + return; + } + final long pollLatency = advanceNowAndComputeLatency(); if (records != null && !records.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index f99bb89..7a4e33b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -20,8 +20,10 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -73,6 +75,7 @@ import org.junit.Test; import java.io.File; import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -665,6 +668,85 @@ public class StreamThreadTest { } @Test + public void shouldNotThrowWhenPendingShutdownInRunOnce() { + mockRunOnce(true); + } + + @Test + public void shouldNotThrowWithoutPendingShutdownInRunOnce() { + // A reference test to verify that without intermediate shutdown the runOnce should pass + // without any exception. + mockRunOnce(false); + } + + private void mockRunOnce(final boolean shutdownOnPoll) { + final Collection<TopicPartition> assignedPartitions = Collections.singletonList(t1p1); + class MockStreamThreadConsumer<K, V> extends MockConsumer<K, V> { + + private StreamThread streamThread; + + private MockStreamThreadConsumer(final OffsetResetStrategy offsetResetStrategy) { + super(offsetResetStrategy); + } + + @Override + public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { + assertNotNull(streamThread); + if (shutdownOnPoll) { + streamThread.shutdown(); + } + streamThread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + return super.poll(timeout); + } + + private void setStreamThread(final StreamThread streamThread) { + this.streamThread = streamThread; + } + } + + final MockStreamThreadConsumer<byte[], byte[]> mockStreamThreadConsumer = + new MockStreamThreadConsumer<>(OffsetResetStrategy.EARLIEST); + + final TaskManager taskManager = new TaskManager(new MockChangelogReader(), + processId, + "log-prefix", + mockStreamThreadConsumer, + streamsMetadataState, + null, + null, + null, + new AssignedStreamsTasks(new LogContext()), + new AssignedStandbyTasks(new LogContext())); + taskManager.setConsumer(mockStreamThreadConsumer); + taskManager.setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap()); + + final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, clientId); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + mockStreamThreadConsumer, + mockStreamThreadConsumer, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext(""), + new AtomicInteger() + ).updateThreadMetadata(getSharedAdminClientId(clientId)); + + mockStreamThreadConsumer.setStreamThread(thread); + mockStreamThreadConsumer.assign(assignedPartitions); + mockStreamThreadConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + + addRecord(mockStreamThreadConsumer, 1L, 0L); + thread.setState(StreamThread.State.STARTING); + thread.setState(StreamThread.State.PARTITIONS_REVOKED); + thread.runOnce(); + } + + @Test public void shouldOnlyShutdownOnce() { final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);