Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d77c057dd -> b872abf69


KAFKA-5603; Don't abort TX for zombie tasks

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Bill Bejeck <b...@confluent.io>, Damian Guy <damian....@gmail.com>

Closes #3722 from mjsax/kafka-5603-dont-abort-tx-for-zombie-tasks-01101


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b872abf6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b872abf6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b872abf6

Branch: refs/heads/0.11.0
Commit: b872abf69b1308da57ad7ad24e9cffe3e5421128
Parents: d77c057
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Thu Aug 24 10:47:50 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Thu Aug 24 10:47:50 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    |  2 --
 .../clients/producer/MockProducerTest.java      | 10 --------
 .../processor/internals/AbstractTask.java       |  3 ++-
 .../processor/internals/AssignedTasks.java      |  6 ++---
 .../processor/internals/StandbyTask.java        |  4 ++-
 .../streams/processor/internals/StreamTask.java | 21 ++++++++++-----
 .../processor/internals/StreamThread.java       |  6 ++---
 .../processor/internals/AbstractTaskTest.java   |  2 +-
 .../processor/internals/AssignedTasksTest.java  | 15 ++++++-----
 .../processor/internals/StreamTaskTest.java     | 27 ++++++++++++++------
 .../processor/internals/StreamThreadTest.java   | 13 ++++++----
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 12 files changed, 63 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 210c2bb..d2a84c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -306,8 +306,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
         }
-        if (transactionInFlight)
-            abortTransaction();
         this.closed = true;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index eeb9b5f..a80e78d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -217,16 +217,6 @@ public class MockProducerTest {
         assertFalse(producer.transactionCommitted());
     }
 
-    @Test
-    public void shouldAbortInFlightTransactionOnClose() {
-        producer.initTransactions();
-        producer.beginTransaction();
-        producer.close();
-        assertFalse(producer.transactionInFlight());
-        assertTrue(producer.transactionAborted());
-        assertFalse(producer.transactionCommitted());
-    }
-
     @Test(expected = IllegalStateException.class)
     public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
         producer.fenceProducer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b8a585a..fbb54ab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -97,7 +97,8 @@ public abstract class AbstractTask {
 
     public abstract void commit();
     public abstract void suspend();
-    public abstract void close(final boolean clean);
+    public abstract void close(final boolean clean,
+                               final boolean isZombie);
 
     public TaskId id() {
         return id;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6db212f..bffca37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -150,7 +150,7 @@ class AssignedTasks<T extends AbstractTask> {
         RuntimeException exception = null;
         for (final T task : tasks) {
             try {
-                task.close(false);
+                task.close(false, false);
             } catch (final RuntimeException e) {
                 log.error("{} Failed to close {}, {}", logPrefix, 
taskTypeName, task.id, e);
                 if (exception == null) {
@@ -178,7 +178,7 @@ class AssignedTasks<T extends AbstractTask> {
             } catch (final RuntimeException e) {
                 log.error("{} Suspending {} {} failed due to the following 
error:", logPrefix, taskTypeName, task.id, e);
                 try {
-                    task.close(false);
+                    task.close(false, false);
                 } catch (final Exception f) {
                     log.error("{} After suspending failed, closing the same {} 
{} failed again due to the following error:", logPrefix, taskTypeName, task.id, 
f);
                 }
@@ -193,7 +193,7 @@ class AssignedTasks<T extends AbstractTask> {
     private void closeZombieTask(final T task) {
         log.warn("{} Producer of {} {} fenced; closing zombie task", 
logPrefix, taskTypeName, task.id);
         try {
-            task.close(false);
+            task.close(false, true);
         } catch (final Exception e) {
             log.warn("{} Failed to close zombie {} due to {}, ignore and 
proceed", taskTypeName, logPrefix, e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index bc36351..c8bce22 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -116,9 +116,11 @@ public class StandbyTask extends AbstractTask {
      * <pre>
      * @param clean ignored by {@code StandbyTask} as it can always try to 
close cleanly
      *              (ie, commit, flush, and write checkpoint file)
+     * @param isZombie ignored by {@code StandbyTask} as it can never be a 
zombie
      */
     @Override
-    public void close(final boolean clean) {
+    public void close(final boolean clean,
+                      final boolean isZombie) {
         if (!taskInitialized) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c5c67bb..149b938 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -393,7 +393,9 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
     }
 
     // helper to avoid calling suspend() twice if a suspended task is not 
reassigned and closed
-    void closeSuspended(boolean clean, RuntimeException firstException) {
+    void closeSuspended(boolean clean,
+                        final boolean isZombie,
+                        RuntimeException firstException) {
         try {
             closeStateManager(clean);
         } catch (final RuntimeException e) {
@@ -411,14 +413,19 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
             if (eosEnabled) {
                 if (!clean) {
                     try {
-                        producer.abortTransaction();
-                        transactionInFlight = false;
+                        if (!isZombie) {
+                            producer.abortTransaction();
+                        }
                     } catch (final ProducerFencedException e) {
                         // can be ignored: transaction got already aborted by 
brokers/transactional-coordinator if this happens
+                    } finally {
+                        transactionInFlight = false;
                     }
                 }
                 try {
-                    recordCollector.close();
+                    if (!isZombie) {
+                        recordCollector.close();
+                    }
                 } catch (final Throwable e) {
                     log.error("{} Failed to close producer due to the 
following error:", logPrefix, e);
                 }
@@ -443,9 +450,11 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
      * </pre>
      * @param clean shut down cleanly (ie, incl. flush and commit) if {@code 
true} --
      *              otherwise, just close open resources
+     * @param isZombie {@code true} is this task is a zombie or not
      */
     @Override
-    public void close(boolean clean) {
+    public void close(boolean clean,
+                      final boolean isZombie) {
         log.debug("{} Closing", logPrefix);
 
         RuntimeException firstException = null;
@@ -457,7 +466,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
             log.error("{} Could not close task due to the following error:", 
logPrefix, e);
         }
 
-        closeSuspended(clean, firstException);
+        closeSuspended(clean, isZombie, firstException);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2d91e1b..40d741f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -982,7 +982,7 @@ public class StreamThread extends Thread {
 
         for (final AbstractTask task : allTasks()) {
             try {
-                task.close(cleanRun);
+                task.close(cleanRun, false);
             } catch (final RuntimeException e) {
                 log.error("{} Failed while closing {} {} due to the following 
error:",
                           logPrefix,
@@ -1042,7 +1042,7 @@ public class StreamThread extends Thread {
             if (!task.partitions().equals(assignedPartitionsForTask)) {
                 log.debug("{} Closing suspended and not re-assigned task {}", 
logPrefix, task.id());
                 try {
-                    task.closeSuspended(true, null);
+                    task.closeSuspended(true, false, null);
                 } catch (final Exception e) {
                     log.error("{} Failed to close suspended task {} due to the 
following error:", logPrefix, task.id, e);
                 } finally {
@@ -1060,7 +1060,7 @@ public class StreamThread extends Thread {
             if (!newStandbyTaskIds.contains(task.id)) {
                 log.debug("{} Closing suspended and not re-assigned standby 
task {}", logPrefix, task.id());
                 try {
-                    task.close(true);
+                    task.close(true, false);
                 } catch (final Exception e) {
                     log.error("{} Failed to remove suspended standby task {} 
due to the following error:", logPrefix, task.id(), e);
                 } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 5e71f31..3e2fc39 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -133,7 +133,7 @@ public class AbstractTaskTest {
             public void suspend() {}
 
             @Override
-            public void close(final boolean clean) {}
+            public void close(final boolean clean, final boolean isZombie) {}
 
             @Override
             public boolean initialize() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 52d0ea8..4daff64 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -167,7 +167,7 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldCloseUnInitializedTasksOnSuspend() {
-        t1.close(false);
+        t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -192,11 +192,12 @@ public class AssignedTasksTest {
         mockInitializedTask();
         t1.suspend();
         EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
-        t1.close(false);
+        t1.close(false, false);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
-        assertThat(suspendTask(), not(nullValue()));
+        final RuntimeException expectedException = suspendTask();
+        assertThat(expectedException, not(nullValue()));
         EasyMock.verify(t1);
     }
 
@@ -205,7 +206,7 @@ public class AssignedTasksTest {
         mockInitializedTask();
         t1.suspend();
         EasyMock.expectLastCall().andThrow(new 
ProducerFencedException("KABOOM!"));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -253,7 +254,7 @@ public class AssignedTasksTest {
         mockInitializedTask();
         t1.commit();
         EasyMock.expectLastCall().andThrow(new ProducerFencedException(""));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
         assignedTasks.addNewTask(t1);
@@ -314,7 +315,7 @@ public class AssignedTasksTest {
     public void shouldCloseTaskOnProcessIfProducerFencedException() {
         mockInitializedTask();
         EasyMock.expect(t1.process()).andThrow(new 
ProducerFencedException(""));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
 
@@ -397,7 +398,7 @@ public class AssignedTasksTest {
     public void shouldCloseTaskOnPunctuateAndCommitIfProducerFencedException() 
{
         mockInitializedTask();
         EasyMock.expect(t1.maybePunctuate()).andThrow(new 
ProducerFencedException(""));
-        t1.close(false);
+        t1.close(false, true);
         EasyMock.expectLastCall();
         EasyMock.replay(t1);
         assignedTasks.addNewTask(t1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f4a0400..ee3f778 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -153,7 +153,7 @@ public class StreamTaskTest {
     public void cleanup() throws IOException {
         try {
             if (task != null) {
-                task.close(true);
+                task.close(true, false);
             }
         } finally {
             Utils.delete(baseDir);
@@ -373,7 +373,7 @@ public class StreamTaskTest {
                                                                  
Collections.<String, String>emptyMap(),
                                                                  
Collections.<StateStore>emptyList());
 
-        task.close(true);
+        task.close(true, false);
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader, config,
             streamsMetrics, stateDirectory, null, time, producer);
@@ -611,11 +611,11 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void 
shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
 throws Exception {
-        task.close(true);
+        task.close(true, false);
         task = createTaskThatThrowsExceptionOnClose();
         task.initialize();
         try {
-            task.close(true);
+            task.close(true, false);
             fail("should have thrown runtime exception");
         } catch (final RuntimeException e) {
             task = null;
@@ -752,18 +752,29 @@ public class StreamTaskTest {
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
-        task.close(false);
+        task.close(false, false);
         task = null;
         assertTrue(producer.transactionAborted());
     }
 
     @Test
+    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws 
Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.close(false, true);
+        task = null;
+        assertFalse(producer.transactionAborted());
+    }
+
+    @Test
     public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws 
Exception {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
-        task.close(false);
+        task.close(false, false);
         assertFalse(producer.transactionAborted());
     }
 
@@ -775,7 +786,7 @@ public class StreamTaskTest {
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer,
             changelogReader, eosConfig, streamsMetrics, stateDirectory, null, 
time, producer);
 
-        task.close(true);
+        task.close(true, false);
         task = null;
         assertTrue(producer.closed());
     }
@@ -784,7 +795,7 @@ public class StreamTaskTest {
     public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
         final StreamTask task = createTaskThatThrowsExceptionOnClose();
         try {
-            task.close(true);
+            task.close(true, false);
         } catch (Exception e) {
             fail("should have not closed unitialized topology");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a3ffdda..07849c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -229,20 +229,23 @@ public class StreamThreadTest {
         }
 
         @Override
-        public void close(final boolean clean) {
+        public void close(final boolean clean,
+                          final boolean isZombie) {
             if (closed && clean) {
                 throw new IllegalStateException("Should not close task that is 
already closed.");
             }
-            super.close(clean);
+            super.close(clean, isZombie);
             closed = true;
         }
 
         @Override
-        public void closeSuspended(final boolean clean, final RuntimeException 
firstException) {
+        public void closeSuspended(final boolean clean,
+                                   final boolean isZombie,
+                                   final RuntimeException firstException) {
             if (closed && clean) {
                 throw new IllegalStateException("Should not close task that is 
not suspended or already closed.");
             }
-            super.closeSuspended(clean, firstException);
+            super.closeSuspended(clean, isZombie, firstException);
             closed = true;
         }
 
@@ -1371,7 +1374,7 @@ public class StreamThreadTest {
                 changelogReader) {
 
             @Override
-            public void close(final boolean clean) {
+            public void close(final boolean clean, final boolean isZombie) {
                 throw new RuntimeException("KABOOM!");
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/b872abf6/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 20ab3f1..fa09c01 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -389,7 +389,7 @@ public class ProcessorTopologyTestDriver {
      */
     public void close() {
         if (task != null) {
-            task.close(true);
+            task.close(true, false);
         }
         if (globalStateTask != null) {
             try {

Reply via email to