Repository: kafka Updated Branches: refs/heads/trunk 71fe23b44 -> 3c9e30a2f
MINOR: Tighten up locking when aborting expired transactions This is a followup to #4137 Author: Apurva Mehta <apu...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #4146 from apurvam/MINOR-followups-to-bump-epoch-on-expire-patch Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c9e30a2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c9e30a2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c9e30a2 Branch: refs/heads/trunk Commit: 3c9e30a2f71c83b7efd45a65ffb5df5a80f48d19 Parents: 71fe23b Author: Apurva Mehta <apu...@confluent.io> Authored: Tue Oct 31 09:57:05 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Oct 31 09:57:05 2017 -0700 ---------------------------------------------------------------------- .../transaction/TransactionCoordinator.scala | 69 +++++++++----------- .../kafka/api/TransactionsTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 14 +++- 3 files changed, 45 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3c9e30a2/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index b307a39..6ad1f40 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -320,8 +320,7 @@ class TransactionCoordinator(brokerId: Int, else PrepareAbort - if (nextState == PrepareAbort && txnMetadata.pendingState.isDefined - && txnMetadata.pendingState.get == PrepareEpochFence) { + if (nextState == PrepareAbort && txnMetadata.pendingState.contains(PrepareEpochFence)) { // We should clear the pending state to make way for the transition to PrepareAbort and also bump // the epoch in the transaction metadata we are about to append. txnMetadata.pendingState = None @@ -454,42 +453,38 @@ class TransactionCoordinator(brokerId: Int, case Some(epochAndTxnMetadata) => val txnMetadata = epochAndTxnMetadata.transactionMetadata - val producerIdHasChanged = txnMetadata.inLock { - txnMetadata.producerId != txnIdAndPidEpoch.producerId - } - if (producerIdHasChanged) { - error(s"Found incorrect producerId when expiring transactionalId: ${txnIdAndPidEpoch.transactionalId}. " + - s"Expected producerId: ${txnIdAndPidEpoch.producerId}. Found producerId: " + - s"${epochAndTxnMetadata.transactionMetadata.producerId}") - Left(Errors.INVALID_PRODUCER_ID_MAPPING) - } else { - val transitMetadata: Either[Errors, TxnTransitMetadata] = txnMetadata.inLock { - if (txnMetadata.pendingTransitionInProgress) - Left(Errors.CONCURRENT_TRANSACTIONS) - else - Right(txnMetadata.prepareFenceProducerEpoch()) - } - transitMetadata match { - case Right(txnTransitMetadata) => - handleEndTransaction(txnMetadata.transactionalId, - txnTransitMetadata.producerId, - txnTransitMetadata.producerEpoch, - TransactionResult.ABORT, - { - case Errors.NONE => - info(s"Completed rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout") - case e @ (Errors.INVALID_PRODUCER_ID_MAPPING | - Errors.INVALID_PRODUCER_EPOCH | - Errors.CONCURRENT_TRANSACTIONS) => - debug(s"Rolling back ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} has aborted due to ${e.exceptionName}") - case e => - warn(s"Rolling back ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} failed due to ${e.exceptionName}") - }) - Right(txnTransitMetadata) - case (error) => - Left(error) + val transitMetadata = txnMetadata.inLock { + if (txnMetadata.producerId != txnIdAndPidEpoch.producerId) { + error(s"Found incorrect producerId when expiring transactionalId: ${txnIdAndPidEpoch.transactionalId}. " + + s"Expected producerId: ${txnIdAndPidEpoch.producerId}. Found producerId: " + + s"${txnMetadata.producerId}") + Left(Errors.INVALID_PRODUCER_ID_MAPPING) + } else if (txnMetadata.pendingTransitionInProgress) { + Left(Errors.CONCURRENT_TRANSACTIONS) + } else { + Right(txnMetadata.prepareFenceProducerEpoch()) } - } + } + transitMetadata match { + case Right(txnTransitMetadata) => + handleEndTransaction(txnMetadata.transactionalId, + txnTransitMetadata.producerId, + txnTransitMetadata.producerEpoch, + TransactionResult.ABORT, + { + case Errors.NONE => + info(s"Completed rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout") + case e @ (Errors.INVALID_PRODUCER_ID_MAPPING | + Errors.INVALID_PRODUCER_EPOCH | + Errors.CONCURRENT_TRANSACTIONS) => + debug(s"Rolling back ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} has aborted due to ${e.exceptionName}") + case e => + warn(s"Rolling back ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} failed due to ${e.exceptionName}") + }) + Right(txnTransitMetadata) + case (error) => + Left(error) + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c9e30a2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 3eee7f1..bb6b520 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -482,14 +482,14 @@ class TransactionsTest extends KafkaServerTestHarness { // Verify that the first message was aborted and the second one was never written at all. val nonTransactionalConsumer = nonTransactionalConsumers(0) nonTransactionalConsumer.subscribe(List(topic1).asJava) - val records = TestUtils.consumeRemainingRecords(nonTransactionalConsumer, 1000) + val records = TestUtils.consumeRecordsFor(nonTransactionalConsumer, 1000) assertEquals(1, records.size) assertEquals("1", TestUtils.recordValueAsString(records.head)) val transactionalConsumer = transactionalConsumers.head transactionalConsumer.subscribe(List(topic1).asJava) - val transactionalRecords = TestUtils.consumeRemainingRecords(transactionalConsumer, 1000) + val transactionalRecords = TestUtils.consumeRecordsFor(transactionalConsumer, 1000) assertTrue(transactionalRecords.isEmpty) } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c9e30a2/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 974a493..a99cd50 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1382,13 +1382,21 @@ object TestUtils extends Logging { records } - def consumeRemainingRecords[K, V](consumer: KafkaConsumer[K, V], timeout: Long): Seq[ConsumerRecord[K, V]] = { + /** + * Will consume all the records for the given consumer for the specified duration. If you want to drain all the + * remaining messages in the partitions the consumer is subscribed to, the duration should be set high enough so + * that the consumer has enough time to poll everything. This would be based on the number of expected messages left + * in the topic, and should not be too large (ie. more than a second) in our tests. + * + * @return All the records consumed by the consumer within the specified duration. + */ + def consumeRecordsFor[K, V](consumer: KafkaConsumer[K, V], duration: Long): Seq[ConsumerRecord[K, V]] = { val startTime = System.currentTimeMillis() val records = new ArrayBuffer[ConsumerRecord[K, V]]() waitUntilTrue(() => { records ++= consumer.poll(50).asScala - System.currentTimeMillis() - startTime > timeout - }, s"The timeout $timeout was greater than the maximum wait time.") + System.currentTimeMillis() - startTime > duration + }, s"The timeout $duration was greater than the maximum wait time.") records }