Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ad75a2fa1 -> 0827d2129


KAFKA-5427; Transactional producer should allow FindCoordinator in error state

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apu...@confluent.io>, 
Guozhang Wang <wangg...@gmail.com>

Closes #3297 from hachikuji/KAFKA-5427

(cherry picked from commit 43e935a630eb0a7fa64c5a1a38bfee17f9b724dc)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0827d212
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0827d212
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0827d212

Branch: refs/heads/0.11.0
Commit: 0827d2129d54d27c42d2314a82f076c6ec30d2df
Parents: ad75a2f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 12 15:04:05 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jun 12 15:05:53 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  |  32 +++--
 .../internals/TransactionManagerTest.java       | 124 ++++++++++++++++++-
 2 files changed, 133 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0827d212/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 821c56b..a26c3b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -97,8 +97,7 @@ public class TransactionManager {
                 case INITIALIZING:
                     return source == UNINITIALIZED;
                 case READY:
-                    return source == INITIALIZING || source == 
COMMITTING_TRANSACTION
-                            || source == ABORTING_TRANSACTION || source == 
ABORTABLE_ERROR;
+                    return source == INITIALIZING || source == 
COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
                 case IN_TRANSACTION:
                     return source == READY;
                 case COMMITTING_TRANSACTION:
@@ -106,8 +105,7 @@ public class TransactionManager {
                 case ABORTING_TRANSACTION:
                     return source == IN_TRANSACTION || source == 
ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
-                    return source == IN_TRANSACTION || source == 
COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
-                            || source == ABORTABLE_ERROR;
+                    return source == IN_TRANSACTION || source == 
COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
                 case FATAL_ERROR:
                 default:
                     // We can transition to FATAL_ERROR unconditionally.
@@ -179,7 +177,7 @@ public class TransactionManager {
         ensureTransactional();
         maybeFailWithError();
         transitionTo(State.COMMITTING_TRANSACTION);
-        return beginCompletingTransaction(true);
+        return beginCompletingTransaction(TransactionResult.COMMIT);
     }
 
     public synchronized TransactionalRequestResult beginAbortingTransaction() {
@@ -190,14 +188,12 @@ public class TransactionManager {
 
         // We're aborting the transaction, so there should be no need to add 
new partitions
         newPartitionsInTransaction.clear();
-        return beginCompletingTransaction(false);
+        return beginCompletingTransaction(TransactionResult.ABORT);
     }
 
-    private TransactionalRequestResult beginCompletingTransaction(boolean 
isCommit) {
+    private TransactionalRequestResult 
beginCompletingTransaction(TransactionResult transactionResult) {
         if (!newPartitionsInTransaction.isEmpty())
             enqueueRequest(addPartitionsToTransactionHandler());
-
-        TransactionResult transactionResult = isCommit ? 
TransactionResult.COMMIT : TransactionResult.ABORT;
         EndTxnRequest.Builder builder = new 
EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                 producerIdAndEpoch.epoch, transactionResult);
         EndTxnHandler handler = new EndTxnHandler(builder);
@@ -225,7 +221,7 @@ public class TransactionManager {
         if (currentState != State.IN_TRANSACTION)
             throw new IllegalStateException("Cannot add partitions to a 
transaction in state " + currentState);
 
-        if (partitionsInTransaction.contains(topicPartition) || 
pendingPartitionsInTransaction.contains(topicPartition))
+        if (isPartitionAdded(topicPartition) || 
isPartitionPendingAdd(topicPartition))
             return;
 
         log.debug("{}Begin adding new partition {} to transaction", logPrefix, 
topicPartition);
@@ -286,6 +282,11 @@ public class TransactionManager {
     }
 
     synchronized void transitionToAbortableError(RuntimeException exception) {
+        if (currentState == State.ABORTING_TRANSACTION) {
+            log.debug("Skipping transition to abortable error state since the 
transaction is already being " +
+                    "aborted. Underlying exception: ", exception);
+            return;
+        }
         transitionTo(State.ABORTABLE_ERROR, exception);
     }
 
@@ -504,13 +505,10 @@ public class TransactionManager {
 
     private boolean maybeTerminateRequestWithError(TxnRequestHandler 
requestHandler) {
         if (hasError()) {
-            if (requestHandler instanceof EndTxnHandler) {
-                // we allow abort requests to break out of the error state. 
The state and the last error
-                // will be cleared when the request returns
-                EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
-                if (endTxnHandler.builder.result() == TransactionResult.ABORT)
-                    return false;
-            }
+            if (hasAbortableError() && requestHandler instanceof 
FindCoordinatorHandler)
+                // No harm letting the FindCoordinator request go through if 
we're expecting to abort
+                return false;
+
             requestHandler.fail(lastError);
             return true;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0827d212/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8d5dbe9..c4abd3c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -80,6 +80,7 @@ import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1059,6 +1060,71 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testAbortableErrorWhileAbortInProgress() throws 
InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        sender.run(time.milliseconds());  // Send Produce Request
+
+        TransactionalRequestResult abortResult = 
transactionManager.beginAbortingTransaction();
+        assertTrue(transactionManager.isAborting());
+        assertFalse(transactionManager.hasError());
+
+        sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, 
epoch);
+        sender.run(time.milliseconds());  // receive the produce response
+
+        // we do not transition to ABORTABLE_ERROR since we were already 
aborting
+        assertTrue(transactionManager.isAborting());
+        assertFalse(transactionManager.hasError());
+
+        sender.run(time.milliseconds());  // handle the abort
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+        assertTrue(transactionManager.isReady());  // make sure we are ready 
for a transaction now.
+    }
+
+    @Test
+    public void testFindCoordinatorAllowedInAbortableErrorState() throws 
InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+
+        transactionManager.transitionToAbortableError(new KafkaException());
+        sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, 
pid);
+        sender.run(time.milliseconds()); // AddPartitions returns
+        assertTrue(transactionManager.hasAbortableError());
+
+        
assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        sender.run(time.milliseconds()); // FindCoordinator handled
+        assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
     public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws 
InterruptedException {
         final long pid = 13131L;
         final short epoch = 1;
@@ -1279,16 +1345,43 @@ public class TransactionManagerTest {
 
         TransactionalRequestResult abortResult = 
transactionManager.beginAbortingTransaction();
 
-        prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 
consumerGroupId, pid, epoch);
+        prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, 
consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
         assertFalse(abortResult.isCompleted());
 
         sender.run(time.milliseconds());
+        assertTrue(transactionManager.isReady());
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
     }
 
     @Test
+    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws 
Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp1, new OffsetAndMetadata(1));
+        final String consumerGroupId = "myconsumergroup";
+
+        transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+
+        TransactionalRequestResult abortResult = 
transactionManager.beginAbortingTransaction();
+
+        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, 
epoch);
+        sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());
+        assertTrue(abortResult.isCompleted());
+        assertFalse(abortResult.isSuccessful());
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
     public void testNoDrainWhenPartitionsPending() throws InterruptedException 
{
         final long pid = 13131L;
         final short epoch = 1;
@@ -1623,8 +1716,15 @@ public class TransactionManagerTest {
         }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
+    private void sendProduceResponse(Errors error, final long pid, final short 
epoch) {
+        client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 
0, error, 0));
+    }
+
     private void prepareProduceResponse(Errors error, final long pid, final 
short epoch) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
+        client.prepareResponse(produceRequestMatcher(pid, epoch), 
produceResponse(tp0, 0, error, 0));
+    }
+    private MockClient.RequestMatcher produceRequestMatcher(final long pid, 
final short epoch) {
+        return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
                 ProduceRequest produceRequest = (ProduceRequest) body;
@@ -1640,12 +1740,24 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, 
produceRequest.transactionalId());
                 return true;
             }
-        }, produceResponse(tp0, 0, error, 0));
+        };
+    }
 
+    private void prepareAddPartitionsToTxnResponse(Errors error, final 
TopicPartition topicPartition,
+                                                   final short epoch, final 
long pid) {
+        client.prepareResponse(addPartitionsRequestMatcher(topicPartition, 
epoch, pid),
+                new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, 
error)));
     }
 
-    private void prepareAddPartitionsToTxnResponse(Errors error, final 
TopicPartition topicPartition, final short epoch, final long pid) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
+    private void sendAddPartitionsToTxnResponse(Errors error, final 
TopicPartition topicPartition,
+                                                final short epoch, final long 
pid) {
+        client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid),
+                new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, 
error)));
+    }
+
+    private MockClient.RequestMatcher addPartitionsRequestMatcher(final 
TopicPartition topicPartition,
+                                                                  final short 
epoch, final long pid) {
+        return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
                 AddPartitionsToTxnRequest addPartitionsToTxnRequest = 
(AddPartitionsToTxnRequest) body;
@@ -1655,7 +1767,7 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, 
addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
-        }, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, 
error)));
+        };
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult 
result, final long pid, final short epoch) {

Reply via email to