Repository: kafka Updated Branches: refs/heads/0.11.0 97f5525ca -> c50bee94a
KAFKA-5167: Release state locks in case of failure Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]>, Guozhang Wang <[email protected]> Closes #3449 from mjsax/kafka-5167-streams-task-gets-stuck-after-re-balance-due-to-LockException (cherry picked from commit 70e949d522eba72b22c2619c4fde372d0f1a26b3) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c50bee94 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c50bee94 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c50bee94 Branch: refs/heads/0.11.0 Commit: c50bee94a3155822cc9d448673965b9d9f9ace54 Parents: 97f5525 Author: Matthias J. Sax <[email protected]> Authored: Wed Jul 5 20:20:51 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Jul 5 20:21:04 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 26 ++- .../processor/internals/StreamThreadTest.java | 213 ++++++++++++++++++- 2 files changed, 231 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c50bee94/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9aa145b..db42061 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1121,13 +1121,31 @@ public class StreamThread extends Thread { @Override public void apply(final StreamTask task) { - task.suspend(); + try { + task.suspend(); + } catch (final Exception e) { + try { + task.close(false); + } catch (final Exception f) { + log.error("{} Closing task {} failed: ", logPrefix, task.id, f); + } + throw e; + } } })); for (final StandbyTask task : standbyTasks.values()) { try { - task.suspend(); + try { + task.suspend(); + } catch (final Exception e) { + try { + task.close(false); + } catch (final Exception f) { + log.error("{} Closing standby task {} failed: ", logPrefix, task.id, f); + } + throw e; + } } catch (final RuntimeException e) { firstException.compareAndSet(null, e); } @@ -1231,6 +1249,7 @@ public class StreamThread extends Thread { } } + // visible for testing protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { streamsMetrics.taskCreatedSensor.record(); @@ -1317,7 +1336,8 @@ public class StreamThread extends Thread { taskCreator.retryWithBackoff(newTasks, start); } - private StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) { + // visible for testing + protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) { streamsMetrics.taskCreatedSensor.record(); final ProcessorTopology topology = builder.build(id.topicGroupId); http://git-wip-us.apache.org/repos/asf/kafka/blob/c50bee94/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5a31ccd..a0882cf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -542,9 +542,16 @@ public class StreamThreadTest { private class MockStreamsPartitionAssignor extends StreamPartitionAssignor { private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment; + private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment; MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) { + this(activeTaskAssignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + } + + MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment, + final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) { this.activeTaskAssignment = activeTaskAssignment; + this.standbyTaskAssignment = standbyTaskAssignment; } @Override @@ -553,6 +560,11 @@ public class StreamThreadTest { } @Override + Map<TaskId, Set<TopicPartition>> standbyTasks() { + return standbyTaskAssignment; + } + + @Override public void close() {} } @@ -1249,10 +1261,10 @@ public class StreamThreadTest { } }); - StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); - Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); updatedTopicsField.setAccessible(true); - Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates); + final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates); updatedTopics.add(t1.topic()); builder.updateSubscriptions(subscriptionUpdates, null); @@ -1685,13 +1697,13 @@ public class StreamThreadTest { final TaskId taskId2 = new TaskId(0, 0); final TaskId taskId3 = new TaskId(0, 0); - List<TaskId> activeTasks = Arrays.asList(taskId1); + List<TaskId> activeTasks = Utils.mkList(taskId1); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>()); - topicPartitions.addAll(Arrays.asList(topicPartition1)); + topicPartitions.addAll(Utils.mkList(topicPartition1)); PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); partitionAssignor.onAssignment(assignment); @@ -1726,6 +1738,197 @@ public class StreamThreadTest { } + @Test + public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupTest(taskId); + + final StateDirectory testStateDir = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + fail("Should have thrown exception"); + } catch (final Exception e) { + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupTest(taskId); + thread.start(); + + final StateDirectory testStateDir = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.close(); + thread.join(); + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + private StreamThread setupTest(final TaskId taskId) { + final TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId(applicationId); + builder.addSource("source", "topic"); + + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final StateDirectory stateDirectory = new StateDirectory( + applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + final TestStreamTask testStreamTask = new TestStreamTask(taskId, + applicationId, + Utils.mkSet(new TopicPartition("topic", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory) { + + @Override + public void suspend() { + throw new RuntimeException("KABOOM!!!"); + } + }; + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { + return testStreamTask; + } + }; + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + + return thread; + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupStandbyTest(taskId); + + final StateDirectory testStateDir = new StateDirectory(applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + fail("Should have thrown exception"); + } catch (final Exception e) { + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + final StreamThread thread = setupStandbyTest(taskId); + thread.start(); + + final StateDirectory testStateDir = new StateDirectory(applicationId, + config.getString(StreamsConfig.STATE_DIR_CONFIG), + mockTime); + + assertFalse(testStateDir.lock(taskId, 0)); + try { + thread.close(); + thread.join(); + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + private StreamThread setupStandbyTest(final TaskId taskId) { + final String storeName = "store"; + final String changelogTopic = applicationId + "-" + storeName + "-changelog"; + + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + builder.stream("topic1").groupByKey().count(storeName); + + final MockClientSupplier clientSupplier = new MockClientSupplier(); + clientSupplier.restoreConsumer.updatePartitions(changelogTopic, + Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null))); + clientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() { + { + put(new TopicPartition(changelogTopic, 0), 0L); + } + }); + clientSupplier.restoreConsumer.updateEndOffsets(new HashMap<TopicPartition, Long>() { + { + put(new TopicPartition(changelogTopic, 0), 0L); + } + }); + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + + @Override + protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) { + return new StandbyTask( + taskId, + applicationId, + partitions, + builder.build(0), + clientSupplier.consumer, + new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime, 1000), + StreamThreadTest.this.config, + new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String, String>emptyMap()), + stateDirectory) { + + @Override + public void suspend() { + throw new RuntimeException("KABOOM!!!"); + } + + @Override + public void commit() { + throw new RuntimeException("KABOOM!!!"); + } + }; + } + }; + + final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); + standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0))); + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks)); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet()); + + return thread; + } + private void initPartitionGrouper(final StreamsConfig config, final StreamThread thread, final MockClientSupplier clientSupplier) {
