Repository: kafka Updated Branches: refs/heads/0.11.0 d77c057dd -> b872abf69
KAFKA-5603; Don't abort TX for zombie tasks Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Bill Bejeck <b...@confluent.io>, Damian Guy <damian....@gmail.com> Closes #3722 from mjsax/kafka-5603-dont-abort-tx-for-zombie-tasks-01101 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b872abf6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b872abf6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b872abf6 Branch: refs/heads/0.11.0 Commit: b872abf69b1308da57ad7ad24e9cffe3e5421128 Parents: d77c057 Author: Matthias J. Sax <matth...@confluent.io> Authored: Thu Aug 24 10:47:50 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Thu Aug 24 10:47:50 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/producer/MockProducer.java | 2 -- .../clients/producer/MockProducerTest.java | 10 -------- .../processor/internals/AbstractTask.java | 3 ++- .../processor/internals/AssignedTasks.java | 6 ++--- .../processor/internals/StandbyTask.java | 4 ++- .../streams/processor/internals/StreamTask.java | 21 ++++++++++----- .../processor/internals/StreamThread.java | 6 ++--- .../processor/internals/AbstractTaskTest.java | 2 +- .../processor/internals/AssignedTasksTest.java | 15 ++++++----- .../processor/internals/StreamTaskTest.java | 27 ++++++++++++++------ .../processor/internals/StreamThreadTest.java | 13 ++++++---- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- 12 files changed, 63 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 210c2bb..d2a84c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -306,8 +306,6 @@ public class MockProducer<K, V> implements Producer<K, V> { if (this.closed) { throw new IllegalStateException("MockProducer is already closed."); } - if (transactionInFlight) - abortTransaction(); this.closed = true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index eeb9b5f..a80e78d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -217,16 +217,6 @@ public class MockProducerTest { assertFalse(producer.transactionCommitted()); } - @Test - public void shouldAbortInFlightTransactionOnClose() { - producer.initTransactions(); - producer.beginTransaction(); - producer.close(); - assertFalse(producer.transactionInFlight()); - assertTrue(producer.transactionAborted()); - assertFalse(producer.transactionCommitted()); - } - @Test(expected = IllegalStateException.class) public void shouldThrowFenceProducerIfTransactionsNotInitialized() { producer.fenceProducer(); http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index b8a585a..fbb54ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -97,7 +97,8 @@ public abstract class AbstractTask { public abstract void commit(); public abstract void suspend(); - public abstract void close(final boolean clean); + public abstract void close(final boolean clean, + final boolean isZombie); public TaskId id() { return id; http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 6db212f..bffca37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -150,7 +150,7 @@ class AssignedTasks<T extends AbstractTask> { RuntimeException exception = null; for (final T task : tasks) { try { - task.close(false); + task.close(false, false); } catch (final RuntimeException e) { log.error("{} Failed to close {}, {}", logPrefix, taskTypeName, task.id, e); if (exception == null) { @@ -178,7 +178,7 @@ class AssignedTasks<T extends AbstractTask> { } catch (final RuntimeException e) { log.error("{} Suspending {} {} failed due to the following error:", logPrefix, taskTypeName, task.id, e); try { - task.close(false); + task.close(false, false); } catch (final Exception f) { log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", logPrefix, taskTypeName, task.id, f); } @@ -193,7 +193,7 @@ class AssignedTasks<T extends AbstractTask> { private void closeZombieTask(final T task) { log.warn("{} Producer of {} {} fenced; closing zombie task", logPrefix, taskTypeName, task.id); try { - task.close(false); + task.close(false, true); } catch (final Exception e) { log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", taskTypeName, logPrefix, e); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index bc36351..c8bce22 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -116,9 +116,11 @@ public class StandbyTask extends AbstractTask { * <pre> * @param clean ignored by {@code StandbyTask} as it can always try to close cleanly * (ie, commit, flush, and write checkpoint file) + * @param isZombie ignored by {@code StandbyTask} as it can never be a zombie */ @Override - public void close(final boolean clean) { + public void close(final boolean clean, + final boolean isZombie) { if (!taskInitialized) { return; } http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c5c67bb..149b938 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -393,7 +393,9 @@ public class StreamTask extends AbstractTask implements Punctuator { } // helper to avoid calling suspend() twice if a suspended task is not reassigned and closed - void closeSuspended(boolean clean, RuntimeException firstException) { + void closeSuspended(boolean clean, + final boolean isZombie, + RuntimeException firstException) { try { closeStateManager(clean); } catch (final RuntimeException e) { @@ -411,14 +413,19 @@ public class StreamTask extends AbstractTask implements Punctuator { if (eosEnabled) { if (!clean) { try { - producer.abortTransaction(); - transactionInFlight = false; + if (!isZombie) { + producer.abortTransaction(); + } } catch (final ProducerFencedException e) { // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens + } finally { + transactionInFlight = false; } } try { - recordCollector.close(); + if (!isZombie) { + recordCollector.close(); + } } catch (final Throwable e) { log.error("{} Failed to close producer due to the following error:", logPrefix, e); } @@ -443,9 +450,11 @@ public class StreamTask extends AbstractTask implements Punctuator { * </pre> * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} -- * otherwise, just close open resources + * @param isZombie {@code true} is this task is a zombie or not */ @Override - public void close(boolean clean) { + public void close(boolean clean, + final boolean isZombie) { log.debug("{} Closing", logPrefix); RuntimeException firstException = null; @@ -457,7 +466,7 @@ public class StreamTask extends AbstractTask implements Punctuator { log.error("{} Could not close task due to the following error:", logPrefix, e); } - closeSuspended(clean, firstException); + closeSuspended(clean, isZombie, firstException); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 2d91e1b..40d741f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -982,7 +982,7 @@ public class StreamThread extends Thread { for (final AbstractTask task : allTasks()) { try { - task.close(cleanRun); + task.close(cleanRun, false); } catch (final RuntimeException e) { log.error("{} Failed while closing {} {} due to the following error:", logPrefix, @@ -1042,7 +1042,7 @@ public class StreamThread extends Thread { if (!task.partitions().equals(assignedPartitionsForTask)) { log.debug("{} Closing suspended and not re-assigned task {}", logPrefix, task.id()); try { - task.closeSuspended(true, null); + task.closeSuspended(true, false, null); } catch (final Exception e) { log.error("{} Failed to close suspended task {} due to the following error:", logPrefix, task.id, e); } finally { @@ -1060,7 +1060,7 @@ public class StreamThread extends Thread { if (!newStandbyTaskIds.contains(task.id)) { log.debug("{} Closing suspended and not re-assigned standby task {}", logPrefix, task.id()); try { - task.close(true); + task.close(true, false); } catch (final Exception e) { log.error("{} Failed to remove suspended standby task {} due to the following error:", logPrefix, task.id(), e); } finally { http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 5e71f31..3e2fc39 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -133,7 +133,7 @@ public class AbstractTaskTest { public void suspend() {} @Override - public void close(final boolean clean) {} + public void close(final boolean clean, final boolean isZombie) {} @Override public boolean initialize() { http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java index 52d0ea8..4daff64 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java @@ -167,7 +167,7 @@ public class AssignedTasksTest { @Test public void shouldCloseUnInitializedTasksOnSuspend() { - t1.close(false); + t1.close(false, false); EasyMock.expectLastCall(); EasyMock.replay(t1); @@ -192,11 +192,12 @@ public class AssignedTasksTest { mockInitializedTask(); t1.suspend(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")); - t1.close(false); + t1.close(false, false); EasyMock.expectLastCall(); EasyMock.replay(t1); - assertThat(suspendTask(), not(nullValue())); + final RuntimeException expectedException = suspendTask(); + assertThat(expectedException, not(nullValue())); EasyMock.verify(t1); } @@ -205,7 +206,7 @@ public class AssignedTasksTest { mockInitializedTask(); t1.suspend(); EasyMock.expectLastCall().andThrow(new ProducerFencedException("KABOOM!")); - t1.close(false); + t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); @@ -253,7 +254,7 @@ public class AssignedTasksTest { mockInitializedTask(); t1.commit(); EasyMock.expectLastCall().andThrow(new ProducerFencedException("")); - t1.close(false); + t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); assignedTasks.addNewTask(t1); @@ -314,7 +315,7 @@ public class AssignedTasksTest { public void shouldCloseTaskOnProcessIfProducerFencedException() { mockInitializedTask(); EasyMock.expect(t1.process()).andThrow(new ProducerFencedException("")); - t1.close(false); + t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); @@ -397,7 +398,7 @@ public class AssignedTasksTest { public void shouldCloseTaskOnPunctuateAndCommitIfProducerFencedException() { mockInitializedTask(); EasyMock.expect(t1.maybePunctuate()).andThrow(new ProducerFencedException("")); - t1.close(false); + t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); assignedTasks.addNewTask(t1); http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index f4a0400..ee3f778 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -153,7 +153,7 @@ public class StreamTaskTest { public void cleanup() throws IOException { try { if (task != null) { - task.close(true); + task.close(true, false); } } finally { Utils.delete(baseDir); @@ -373,7 +373,7 @@ public class StreamTaskTest { Collections.<String, String>emptyMap(), Collections.<StateStore>emptyList()); - task.close(true); + task.close(true, false); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); @@ -611,11 +611,11 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception { - task.close(true); + task.close(true, false); task = createTaskThatThrowsExceptionOnClose(); task.initialize(); try { - task.close(true); + task.close(true, false); fail("should have thrown runtime exception"); } catch (final RuntimeException e) { task = null; @@ -752,18 +752,29 @@ public class StreamTaskTest { task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); - task.close(false); + task.close(false, false); task = null; assertTrue(producer.transactionAborted()); } @Test + public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); + + task.close(false, true); + task = null; + assertFalse(producer.transactionAborted()); + } + + @Test public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); - task.close(false); + task.close(false, false); assertFalse(producer.transactionAborted()); } @@ -775,7 +786,7 @@ public class StreamTaskTest { task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); - task.close(true); + task.close(true, false); task = null; assertTrue(producer.closed()); } @@ -784,7 +795,7 @@ public class StreamTaskTest { public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() { final StreamTask task = createTaskThatThrowsExceptionOnClose(); try { - task.close(true); + task.close(true, false); } catch (Exception e) { fail("should have not closed unitialized topology"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a3ffdda..07849c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -229,20 +229,23 @@ public class StreamThreadTest { } @Override - public void close(final boolean clean) { + public void close(final boolean clean, + final boolean isZombie) { if (closed && clean) { throw new IllegalStateException("Should not close task that is already closed."); } - super.close(clean); + super.close(clean, isZombie); closed = true; } @Override - public void closeSuspended(final boolean clean, final RuntimeException firstException) { + public void closeSuspended(final boolean clean, + final boolean isZombie, + final RuntimeException firstException) { if (closed && clean) { throw new IllegalStateException("Should not close task that is not suspended or already closed."); } - super.closeSuspended(clean, firstException); + super.closeSuspended(clean, isZombie, firstException); closed = true; } @@ -1371,7 +1374,7 @@ public class StreamThreadTest { changelogReader) { @Override - public void close(final boolean clean) { + public void close(final boolean clean, final boolean isZombie) { throw new RuntimeException("KABOOM!"); } }; http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 20ab3f1..fa09c01 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -389,7 +389,7 @@ public class ProcessorTopologyTestDriver { */ public void close() { if (task != null) { - task.close(true); + task.close(true, false); } if (globalStateTask != null) { try {