Repository: kafka Updated Branches: refs/heads/trunk 2724053ad -> 43e935a63
KAFKA-5427; Transactional producer should allow FindCoordinator in error state Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Apurva Mehta <apu...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3297 from hachikuji/KAFKA-5427 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43e935a6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43e935a6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43e935a6 Branch: refs/heads/trunk Commit: 43e935a630eb0a7fa64c5a1a38bfee17f9b724dc Parents: 2724053 Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Jun 12 15:04:05 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Jun 12 15:04:05 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/TransactionManager.java | 32 +++-- .../internals/TransactionManagerTest.java | 124 ++++++++++++++++++- 2 files changed, 133 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/43e935a6/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 821c56b..a26c3b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -97,8 +97,7 @@ public class TransactionManager { case INITIALIZING: return source == UNINITIALIZED; case READY: - return source == INITIALIZING || source == COMMITTING_TRANSACTION - || source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR; + return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; case IN_TRANSACTION: return source == READY; case COMMITTING_TRANSACTION: @@ -106,8 +105,7 @@ public class TransactionManager { case ABORTING_TRANSACTION: return source == IN_TRANSACTION || source == ABORTABLE_ERROR; case ABORTABLE_ERROR: - return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION - || source == ABORTABLE_ERROR; + return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR; case FATAL_ERROR: default: // We can transition to FATAL_ERROR unconditionally. @@ -179,7 +177,7 @@ public class TransactionManager { ensureTransactional(); maybeFailWithError(); transitionTo(State.COMMITTING_TRANSACTION); - return beginCompletingTransaction(true); + return beginCompletingTransaction(TransactionResult.COMMIT); } public synchronized TransactionalRequestResult beginAbortingTransaction() { @@ -190,14 +188,12 @@ public class TransactionManager { // We're aborting the transaction, so there should be no need to add new partitions newPartitionsInTransaction.clear(); - return beginCompletingTransaction(false); + return beginCompletingTransaction(TransactionResult.ABORT); } - private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) { + private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) { if (!newPartitionsInTransaction.isEmpty()) enqueueRequest(addPartitionsToTransactionHandler()); - - TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT; EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, transactionResult); EndTxnHandler handler = new EndTxnHandler(builder); @@ -225,7 +221,7 @@ public class TransactionManager { if (currentState != State.IN_TRANSACTION) throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState); - if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition)) + if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition)) return; log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition); @@ -286,6 +282,11 @@ public class TransactionManager { } synchronized void transitionToAbortableError(RuntimeException exception) { + if (currentState == State.ABORTING_TRANSACTION) { + log.debug("Skipping transition to abortable error state since the transaction is already being " + + "aborted. Underlying exception: ", exception); + return; + } transitionTo(State.ABORTABLE_ERROR, exception); } @@ -504,13 +505,10 @@ public class TransactionManager { private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) { if (hasError()) { - if (requestHandler instanceof EndTxnHandler) { - // we allow abort requests to break out of the error state. The state and the last error - // will be cleared when the request returns - EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler; - if (endTxnHandler.builder.result() == TransactionResult.ABORT) - return false; - } + if (hasAbortableError() && requestHandler instanceof FindCoordinatorHandler) + // No harm letting the FindCoordinator request go through if we're expecting to abort + return false; + requestHandler.fail(lastError); return true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/43e935a6/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8d5dbe9..c4abd3c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -80,6 +80,7 @@ import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1059,6 +1060,71 @@ public class TransactionManagerTest { } @Test + public void testAbortableErrorWhileAbortInProgress() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + assertFalse(responseFuture.isDone()); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + + sender.run(time.milliseconds()); // Send AddPartitionsRequest + sender.run(time.milliseconds()); // Send Produce Request + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + assertTrue(transactionManager.isAborting()); + assertFalse(transactionManager.hasError()); + + sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); + sender.run(time.milliseconds()); // receive the produce response + + // we do not transition to ABORTABLE_ERROR since we were already aborting + assertTrue(transactionManager.isAborting()); + assertFalse(transactionManager.hasError()); + + sender.run(time.milliseconds()); // handle the abort + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. + } + + @Test + public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + assertFalse(responseFuture.isDone()); + sender.run(time.milliseconds()); // Send AddPartitionsRequest + + transactionManager.transitionToAbortableError(new KafkaException()); + sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid); + sender.run(time.milliseconds()); // AddPartitions returns + assertTrue(transactionManager.hasAbortableError()); + + assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION)); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // FindCoordinator handled + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + assertTrue(transactionManager.hasAbortableError()); + } + + @Test public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException { final long pid = 13131L; final short epoch = 1; @@ -1279,16 +1345,43 @@ public class TransactionManagerTest { TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); - prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch); + prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch); sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest assertFalse(abortResult.isCompleted()); sender.run(time.milliseconds()); + assertTrue(transactionManager.isReady()); assertTrue(abortResult.isCompleted()); assertTrue(abortResult.isSuccessful()); } @Test + public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(tp1, new OffsetAndMetadata(1)); + final String consumerGroupId = "myconsumergroup"; + + transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId); + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + + prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest + assertFalse(abortResult.isCompleted()); + + sender.run(time.milliseconds()); + assertTrue(abortResult.isCompleted()); + assertFalse(abortResult.isSuccessful()); + assertTrue(transactionManager.hasFatalError()); + } + + @Test public void testNoDrainWhenPartitionsPending() throws InterruptedException { final long pid = 13131L; final short epoch = 1; @@ -1623,8 +1716,15 @@ public class TransactionManagerTest { }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect); } + private void sendProduceResponse(Errors error, final long pid, final short epoch) { + client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0)); + } + private void prepareProduceResponse(Errors error, final long pid, final short epoch) { - client.prepareResponse(new MockClient.RequestMatcher() { + client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0)); + } + private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) { + return new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { ProduceRequest produceRequest = (ProduceRequest) body; @@ -1640,12 +1740,24 @@ public class TransactionManagerTest { assertEquals(transactionalId, produceRequest.transactionalId()); return true; } - }, produceResponse(tp0, 0, error, 0)); + }; + } + private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, + final short epoch, final long pid) { + client.prepareResponse(addPartitionsRequestMatcher(topicPartition, epoch, pid), + new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error))); } - private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, final short epoch, final long pid) { - client.prepareResponse(new MockClient.RequestMatcher() { + private void sendAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, + final short epoch, final long pid) { + client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid), + new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error))); + } + + private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition, + final short epoch, final long pid) { + return new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body; @@ -1655,7 +1767,7 @@ public class TransactionManagerTest { assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId()); return true; } - }, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error))); + }; } private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {