Repository: kafka Updated Branches: refs/heads/0.11.0 26c6a0715 -> f6bcb8472
KAFKA-5416; TC should not reset pending state if log append is retried In `TransationStateManager`, we reset the pending state if an error occurred while appending to log; this is correct except that for the `TransactionMarkerChannelManager`, as it will retry appending to log and if eventually it succeeded, the transaction metadata's completing transition will throw an IllegalStateException since pending state is None, this will be thrown all the way to the `KafkaApis` and be swallowed. 1. Do not reset the pending state if the append will be retried (as is the case when write the complete transition). 2. A bunch of log4j improvements based the debugging experience. The main principle is to make sure all error codes that is about to sent to the client will be logged, and unnecessary log4j entries to be removed. 3. Also moved some log entries in ReplicationUtils.scala to `trace`: this is rather orthogonal to this PR but I found it rather annoying while debugging the logs. 4. A couple of unrelated bug fixes as pointed by hachikuji and apurvam. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Apurva Mehta <apu...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #3287 from guozhangwang/KHotfix-transaction-coordinator-append-callback (cherry picked from commit 9d6f0f40ceb4859d54454e03f6abfe8b67963e83) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6bcb847 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6bcb847 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6bcb847 Branch: refs/heads/0.11.0 Commit: f6bcb84727ea8710392ce3d0960a6b511dc3d907 Parents: 26c6a07 Author: Guozhang Wang <wangg...@gmail.com> Authored: Mon Jun 12 12:47:42 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Jun 12 12:50:16 2017 -0700 ---------------------------------------------------------------------- .../transaction/TransactionCoordinator.scala | 43 +++-- .../TransactionMarkerChannelManager.scala | 59 +++++-- ...nsactionMarkerRequestCompletionHandler.scala | 3 + .../transaction/TransactionMetadata.scala | 26 +-- .../transaction/TransactionStateManager.scala | 112 +++++++----- .../scala/kafka/utils/ReplicationUtils.scala | 14 +- .../TransactionCoordinatorTest.scala | 24 ++- .../TransactionMarkerChannelManagerTest.scala | 177 ++++++++++++++++++- .../TransactionStateManagerTest.scala | 76 +++++++- 9 files changed, 419 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 040ab38..7058de6 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -138,7 +138,9 @@ class TransactionCoordinator(brokerId: Int, } case Right(None) => - throw new IllegalStateException("Trying to add metadata to the cache still returns NONE; this is not expected") + val errorMsg = "Trying to add metadata to the cache still returns NONE; this is not expected" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } result match { @@ -169,6 +171,7 @@ class TransactionCoordinator(brokerId: Int, s"${Topic.TRANSACTION_STATE_TOPIC_NAME}-${txnManager.partitionFor(transactionalId)}") responseCallback(initTransactionMetadata(newMetadata)) } else { + info(s"Returning $error error code to client for $transactionalId's InitProducerId request") responseCallback(initTransactionError(error)) } } @@ -211,8 +214,11 @@ class TransactionCoordinator(brokerId: Int, // then when the client retries, we will generate a new producerId. Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch()) case Dead => - throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + - s"This is illegal as we should never have transitioned to this state.") + val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + + s"This is illegal as we should never have transitioned to this state." + fatal(errorMsg) + throw new IllegalStateException(errorMsg) + } } } @@ -223,6 +229,7 @@ class TransactionCoordinator(brokerId: Int, partitions: collection.Set[TopicPartition], responseCallback: AddPartitionsCallback): Unit = { if (transactionalId == null || transactionalId.isEmpty) { + debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request") responseCallback(Errors.INVALID_REQUEST) } else { // try to update the transaction metadata and append the updated metadata to txn log; @@ -260,6 +267,7 @@ class TransactionCoordinator(brokerId: Int, result match { case Left(err) => + info(s"Returning $err error code to client for $transactionalId's AddPartitions request") responseCallback(err) case Right((coordinatorEpoch, newMetadata)) => @@ -341,8 +349,10 @@ class TransactionCoordinator(brokerId: Int, case Empty => logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case Dead => - throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + - s"This is illegal as we should never have transitioned to this state.") + val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + + s"This is illegal as we should never have transitioned to this state." + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } } @@ -350,6 +360,7 @@ class TransactionCoordinator(brokerId: Int, preAppendResult match { case Left(err) => + info(s"Aborting append of $txnMarkerResult to transaction log with coordinator and returning $err error to client for $transactionalId's EndTransaction request") responseCallback(err) case Right((coordinatorEpoch, newMetadata)) => @@ -383,25 +394,31 @@ class TransactionCoordinator(brokerId: Int, else Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) case Dead => - throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + - s"This is illegal as we should never have transitioned to this state.") + val errorMsg = s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + + s"This is illegal as we should never have transitioned to this state." + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } } } else { - info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed since the transaction coordinator epoch " + - s"has been changed to ${epochAndMetadata.coordinatorEpoch} after the transaction metadata has been successfully appended to the log") + debug(s"The transaction coordinator epoch has changed to ${epochAndMetadata.coordinatorEpoch} after $txnMarkerResult was " + + s"successfully appended to the log for $transactionalId with old epoch $coordinatorEpoch") Left(Errors.NOT_COORDINATOR) } case Right(None) => - throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " + - s"no metadata in the cache; this is not expected") + val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " + + s"no metadata in the cache; this is not expected" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } preSendResult match { case Left(err) => + info(s"Aborting sending of transaction markers after appended $txnMarkerResult to transaction log and returning $err error to client for $transactionalId's EndTransaction request") + responseCallback(err) case Right((txnMetadata, newPreSendMetadata)) => @@ -412,8 +429,8 @@ class TransactionCoordinator(brokerId: Int, txnMarkerChannelManager.addTxnMarkersToSend(transactionalId, coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata) } } else { - info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed since the transaction message " + - s"cannot be appended to the log. Returning error code $error to the client") + info(s"Aborting sending of transaction markers and returning $error error to client for $transactionalId's EndTransaction request of $txnMarkerResult, " + + s"since appending $newMetadata to transaction log with coordinator epoch $coordinatorEpoch failed") responseCallback(error) } http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 63a382f..7b923e6 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -130,6 +130,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], time: Time) extends Logging with KafkaMetricsGroup { + this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: " + private val interBrokerListenerName: ListenerName = config.interBrokerListenerName private val txnMarkerSendThread: InterBrokerSendThread = { @@ -197,8 +199,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig, def retryLogAppends(): Unit = { val txnLogAppendRetries: java.util.List[TxnLogAppend] = new util.ArrayList[TxnLogAppend]() txnLogAppendRetryQueue.drainTo(txnLogAppendRetries) - debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends") txnLogAppendRetries.asScala.foreach { txnLogAppend => + debug(s"Retry appending $txnLogAppend transaction log") tryAppendToLog(txnLogAppend) } } @@ -254,25 +256,29 @@ class TransactionMarkerChannelManager(config: KafkaConfig, case Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) => info(s"I am loading the transaction partition that contains $transactionalId while my current coordinator epoch is $coordinatorEpoch; " + - s"so appending $newMetadata to transaction log since the loading process will continue the left work") + s"so cancel appending $newMetadata to transaction log since the loading process will continue the remaining work") case Right(Some(epochAndMetadata)) => if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) { - debug(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded") + debug(s"Sending $transactionalId's transaction markers for $txnMetadata with coordinator epoch $coordinatorEpoch succeeded, trying to append complete transaction log now") tryAppendToLog(TxnLogAppend(transactionalId, coordinatorEpoch, txnMetadata, newMetadata)) } else { - info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction markers " + - s"has been sent to brokers. The cached metadata have been changed to $epochAndMetadata since preparing to send markers") + info(s"The cached metadata $txnMetadata has changed to $epochAndMetadata after completed sending the markers with coordinator " + + s"epoch $coordinatorEpoch; abort transiting the metadata to $newMetadata as it may have been updated by another process") } case Right(None) => - throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " + - s"no metadata in the cache; this is not expected") + val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " + + s"no metadata in the cache; this is not expected" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } case other => - throw new IllegalStateException(s"Unexpected error ${other.exceptionName} before appending to txn log for $transactionalId") + val errorMsg = s"Unexpected error ${other.exceptionName} before appending to txn log for $transactionalId" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } } @@ -287,21 +293,30 @@ class TransactionMarkerChannelManager(config: KafkaConfig, def appendCallback(error: Errors): Unit = error match { case Errors.NONE => - trace(s"Completed transaction for ${txnLogAppend.transactionalId} with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state: state after commit: ${txnLogAppend.txnMetadata.state}") + trace(s"Completed transaction for ${txnLogAppend.transactionalId} with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state after commit: ${txnLogAppend.txnMetadata.state}") case Errors.NOT_COORDINATOR => info(s"No longer the coordinator for transactionalId: ${txnLogAppend.transactionalId} while trying to append to transaction log, skip writing to transaction log") case Errors.COORDINATOR_NOT_AVAILABLE => - warn(s"Failed updating transaction state for ${txnLogAppend.transactionalId} when appending to transaction log due to ${error.exceptionName}. retrying") + info(s"Not available to append $txnLogAppend: possible causes include ${Errors.UNKNOWN_TOPIC_OR_PARTITION}, ${Errors.NOT_ENOUGH_REPLICAS}, " + + s"${Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND} and ${Errors.REQUEST_TIMED_OUT}; retry appending") + // enqueue for retry txnLogAppendRetryQueue.add(txnLogAppend) - case errors: Errors => - throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for ${txnLogAppend.transactionalId}") + case Errors.COORDINATOR_LOAD_IN_PROGRESS => + info(s"Coordinator is loading the partition ${txnStateManager.partitionFor(txnLogAppend.transactionalId)} and hence cannot complete append of $txnLogAppend; " + + s"skip writing to transaction log as the loading process should complete it") + + case other: Errors => + val errorMsg = s"Unexpected error ${other.exceptionName} while appending to transaction log for ${txnLogAppend.transactionalId}" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } - txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback) + txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback, + _ == Errors.COORDINATOR_NOT_AVAILABLE) } def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, producerEpoch: Short, @@ -352,8 +367,11 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } case Right(None) => - throw new IllegalStateException(s"The coordinator still owns the transaction partition for $transactionalId, but there is " + - s"no metadata in the cache; this is not expected") + val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " + + s"no metadata in the cache; this is not expected" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) + } } } @@ -388,4 +406,13 @@ class TransactionMarkerChannelManager(config: KafkaConfig, case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry) -case class TxnLogAppend(transactionalId: String, coordinatorEpoch: Int, txnMetadata: TransactionMetadata, newMetadata: TxnTransitMetadata) +case class TxnLogAppend(transactionalId: String, coordinatorEpoch: Int, txnMetadata: TransactionMetadata, newMetadata: TxnTransitMetadata) { + + override def toString: String = { + "TxnLogAppend(" + + s"transactionalId=$transactionalId, " + + s"coordinatorEpoch=$coordinatorEpoch, " + + s"txnMetadata=$txnMetadata, " + + s"newMetadata=$newMetadata)" + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 68edc65..bfc7da2 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -30,6 +30,9 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, txnStateManager: TransactionStateManager, txnMarkerChannelManager: TransactionMarkerChannelManager, txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry]) extends RequestCompletionHandler with Logging { + + this.logIdent = "[Transaction Marker Request Completion Handler " + brokerId + "]: " + override def onComplete(response: ClientResponse): Unit = { val requestHeader = response.requestHeader val correlationId = requestHeader.correlationId http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index 7e9add3..405e639 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -282,11 +282,15 @@ private[transaction] class TransactionMetadata(val transactionalId: String, // // if valid, transition is done via overwriting the whole object to ensure synchronization - val toState = pendingState.getOrElse(throw new IllegalStateException(s"TransactionalId $transactionalId " + - "completing transaction state transition while it does not have a pending state")) + val toState = pendingState.getOrElse { + fatal(s"$this's transition to $transitMetadata failed since pendingState is not defined: this should not happen") + + throw new IllegalStateException(s"TransactionalId $transactionalId " + + "completing transaction state transition while it does not have a pending state") + } if (toState != transitMetadata.txnState) { - throwStateTransitionFailure(toState) + throwStateTransitionFailure(transitMetadata) } else { toState match { case Empty => // from initPid @@ -294,7 +298,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, transitMetadata.topicPartitions.nonEmpty || transitMetadata.txnStartTimestamp != -1) { - throwStateTransitionFailure(toState) + throwStateTransitionFailure(transitMetadata) } else { txnTimeoutMs = transitMetadata.txnTimeoutMs producerEpoch = transitMetadata.producerEpoch @@ -307,7 +311,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, txnTimeoutMs != transitMetadata.txnTimeoutMs || txnStartTimestamp > transitMetadata.txnStartTimestamp) { - throwStateTransitionFailure(toState) + throwStateTransitionFailure(transitMetadata) } else { txnStartTimestamp = transitMetadata.txnStartTimestamp addPartitions(transitMetadata.topicPartitions) @@ -319,7 +323,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, txnTimeoutMs != transitMetadata.txnTimeoutMs || txnStartTimestamp != transitMetadata.txnStartTimestamp) { - throwStateTransitionFailure(toState) + throwStateTransitionFailure(transitMetadata) } case CompleteAbort | CompleteCommit => // from write markers @@ -327,7 +331,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, txnTimeoutMs != transitMetadata.txnTimeoutMs || transitMetadata.txnStartTimestamp == -1) { - throwStateTransitionFailure(toState) + throwStateTransitionFailure(transitMetadata) } else { txnStartTimestamp = transitMetadata.txnStartTimestamp topicPartitions.clear() @@ -360,14 +364,16 @@ private[transaction] class TransactionMetadata(val transactionalId: String, transitEpoch == producerEpoch + 1 || (transitEpoch == 0 && transitProducerId != producerId) } - private def throwStateTransitionFailure(toState: TransactionState): Unit = { - throw new IllegalStateException(s"TransactionalId $transactionalId failed transition to state $toState " + + private def throwStateTransitionFailure(txnTransitMetadata: TxnTransitMetadata): Unit = { + fatal(s"${this.toString}'s transition to $txnTransitMetadata failed: this should not happen") + + throw new IllegalStateException(s"TransactionalId $transactionalId failed transition to state $txnTransitMetadata " + "due to unexpected metadata") } def pendingTransitionInProgress: Boolean = pendingState.isDefined - override def toString = { + override def toString: String = { "TransactionMetadata(" + s"transactionalId=$transactionalId, " + s"producerId=$producerId, " + http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index da3ba48..d5034b2 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -65,7 +65,7 @@ class TransactionStateManager(brokerId: Int, config: TransactionConfig, time: Time) extends Logging { - this.logIdent = "[Transaction Log Manager " + brokerId + "]: " + this.logIdent = "[Transaction State Manager " + brokerId + "]: " type SendTxnMarkersCallback = (String, Int, TransactionResult, TransactionMetadata, TxnTransitMetadata) => Unit @@ -87,6 +87,16 @@ class TransactionStateManager(brokerId: Int, /** number of partitions for the transaction log topic */ private val transactionTopicPartitionCount = getTransactionTopicPartitionCount + // visible for testing only + private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int): Unit = { + val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch) + + inWriteLock(stateLock) { + leavingPartitions.remove(partitionAndLeaderEpoch) + loadingPartitions.add(partitionAndLeaderEpoch) + } + } + // this is best-effort expiration of an ongoing transaction which has been open for more than its // txn timeout value, we do not need to grab the lock on the metadata object upon checking its state // since the timestamp is volatile and we will get the lock when actually trying to transit the transaction @@ -114,8 +124,6 @@ class TransactionStateManager(brokerId: Int, } } - - def enableTransactionalIdExpiration() { scheduler.schedule("transactionalId-expiration", () => { val now = time.milliseconds() @@ -218,7 +226,7 @@ class TransactionStateManager(brokerId: Int, return Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) if (leavingPartitions.exists(_.txnPartitionId == partitionId)) - Right(Errors.NOT_COORDINATOR) + return Left(Errors.NOT_COORDINATOR) transactionMetadataCache.get(partitionId) match { case Some(cacheEntry) => @@ -345,11 +353,11 @@ class TransactionStateManager(brokerId: Int, if (currentTxnMetadataCacheEntry.isDefined) { val coordinatorEpoch = currentTxnMetadataCacheEntry.get.coordinatorEpoch val metadataPerTxnId = currentTxnMetadataCacheEntry.get.metadataPerTransactionalId - info(s"The metadata cache for txn partition $txnTopicPartition has already exist with epoch $coordinatorEpoch " + + val errorMsg = s"The metadata cache for txn partition $txnTopicPartition has already exist with epoch $coordinatorEpoch " + s"and ${metadataPerTxnId.size} entries while trying to add to it; " + - s"it is likely that another process for loading from the transaction log has just executed earlier before") - - throw new IllegalStateException(s"The metadata cache entry for txn partition $txnTopicPartition has already exist while trying to add to it.") + s"this should not happen" + fatal(errorMsg) + throw new IllegalStateException(errorMsg) } } @@ -376,27 +384,32 @@ class TransactionStateManager(brokerId: Int, if (loadingPartitions.contains(partitionAndLeaderEpoch)) { addLoadedTransactionsToCache(topicPartition.partition, coordinatorEpoch, loadedTransactions) + val transactionsPendingForCompletion = new mutable.ListBuffer[TransactionalIdCoordinatorEpochAndTransitMetadata] loadedTransactions.foreach { case (transactionalId, txnMetadata) => - val result = txnMetadata synchronized { + txnMetadata synchronized { // if state is PrepareCommit or PrepareAbort we need to complete the transaction txnMetadata.state match { case PrepareAbort => - Some(TransactionResult.ABORT, txnMetadata.prepareComplete(time.milliseconds())) + transactionsPendingForCompletion += + TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId, coordinatorEpoch, TransactionResult.ABORT, txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) case PrepareCommit => - Some(TransactionResult.COMMIT, txnMetadata.prepareComplete(time.milliseconds())) + transactionsPendingForCompletion += + TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId, coordinatorEpoch, TransactionResult.COMMIT, txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) case _ => // nothing need to be done - None } } - - result.foreach { case (command, newMetadata) => - sendTxnMarkers(transactionalId, coordinatorEpoch, command, txnMetadata, newMetadata) - } } + // we first remove the partition from loading partition then send out the markers for those pending to be + // completed transactions, so that when the markers get sent the attempt of appending the complete transaction + // log would not be blocked by the coordinator loading error loadingPartitions.remove(partitionAndLeaderEpoch) + + transactionsPendingForCompletion.foreach { txnTransitMetadata => + sendTxnMarkers(txnTransitMetadata.transactionalId, txnTransitMetadata.coordinatorEpoch, txnTransitMetadata.result, txnTransitMetadata.txnMetadata, txnTransitMetadata.transitMetadata) + } } } } @@ -448,7 +461,8 @@ class TransactionStateManager(brokerId: Int, def appendTransactionToLog(transactionalId: String, coordinatorEpoch: Int, newMetadata: TxnTransitMetadata, - responseCallback: Errors => Unit): Unit = { + responseCallback: Errors => Unit, + retryOnError: Errors => Boolean = _ => false): Unit = { // generate the message for this transaction metadata val keyBytes = TransactionLog.keyToBytes(transactionalId) @@ -471,8 +485,7 @@ class TransactionStateManager(brokerId: Int, var responseError = if (status.error == Errors.NONE) { Errors.NONE } else { - debug(s"Transaction state update $newMetadata for $transactionalId failed when appending to log " + - s"due to ${status.error.exceptionName}") + debug(s"Appending $transactionalId's new metadata $newMetadata failed due to ${status.error.exceptionName}") // transform the log append error code to the corresponding coordinator error code status.error match { @@ -480,31 +493,16 @@ class TransactionStateManager(brokerId: Int, | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND | Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry - - info(s"Appending transaction message $newMetadata for $transactionalId failed due to " + - s"${status.error.exceptionName}, returning ${Errors.COORDINATOR_NOT_AVAILABLE} to the client") - Errors.COORDINATOR_NOT_AVAILABLE case Errors.NOT_LEADER_FOR_PARTITION => - - info(s"Appending transaction message $newMetadata for $transactionalId failed due to " + - s"${status.error.exceptionName}, returning ${Errors.NOT_COORDINATOR} to the client") - Errors.NOT_COORDINATOR case Errors.MESSAGE_TOO_LARGE | Errors.RECORD_LIST_TOO_LARGE => - - error(s"Appending transaction message $newMetadata for $transactionalId failed due to " + - s"${status.error.exceptionName}, returning UNKNOWN error code to the client") - Errors.UNKNOWN case other => - error(s"Appending metadata message $newMetadata for $transactionalId failed due to " + - s"unexpected error: ${status.error.message}") - other } } @@ -515,7 +513,9 @@ class TransactionStateManager(brokerId: Int, getTransactionState(transactionalId) match { case Left(err) => - responseCallback(err) + info(s"Accessing the cached transaction metadata for $transactionalId returns $err error; " + + s"aborting transition to the new metadata and setting the error in the callback") + responseError = err case Right(Some(epochAndMetadata)) => val metadata = epochAndMetadata.transactionMetadata @@ -524,8 +524,9 @@ class TransactionStateManager(brokerId: Int, if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) { // the cache may have been changed due to txn topic partition emigration and immigration, // in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator - info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " + - s"has been appended to the log. The cached coordinator epoch has changed to ${epochAndMetadata.coordinatorEpoch}") + info(s"The cached coordinator epoch for $transactionalId has changed to ${epochAndMetadata.coordinatorEpoch} after appended its new metadata $newMetadata " + + s"to the transaction log (txn topic partition ${partitionFor(transactionalId)}) while it was $coordinatorEpoch before appending; " + + s"aborting transition to the new metadata and returning ${Errors.NOT_COORDINATOR} in the callback") responseError = Errors.NOT_COORDINATOR } else { metadata.completeTransitionTo(newMetadata) @@ -536,8 +537,9 @@ class TransactionStateManager(brokerId: Int, case Right(None) => // this transactional id no longer exists, maybe the corresponding partition has already been migrated out. // return NOT_COORDINATOR to let the client re-discover the transaction coordinator - info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " + - s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore") + info(s"The cached coordinator metadata does not exist in the cache anymore for $transactionalId after appended its new metadata $newMetadata " + + s"to the transaction log (txn topic partition ${partitionFor(transactionalId)}) while it was $coordinatorEpoch before appending; " + + s"aborting transition to the new metadata and returning ${Errors.NOT_COORDINATOR} in the callback") responseError = Errors.NOT_COORDINATOR } } else { @@ -547,19 +549,30 @@ class TransactionStateManager(brokerId: Int, val metadata = epochAndTxnMetadata.transactionMetadata metadata synchronized { if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) { - debug(s"TransactionalId ${metadata.transactionalId}, resetting pending state since we are returning error $responseError") - metadata.pendingState = None + if (retryOnError(responseError)) { + info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " + + s"not resetting pending state ${metadata.pendingState} but just returning the error in the callback to let the caller retry") + } else { + info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " + + s"resetting pending state from ${metadata.pendingState}, aborting state transition and returning $responseError in the callback") + + metadata.pendingState = None + } } else { - info(s"TransactionalId ${metadata.transactionalId} coordinator epoch changed from " + - s"${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch after append to log returned $responseError") + info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " + + s"aborting state transition and returning the error in the callback since the coordinator epoch has changed from ${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch") } } + case Right(None) => // Do nothing here, since we want to return the original append error to the user. - info(s"Found no metadata TransactionalId $transactionalId after append to log returned error $responseError") + info(s"TransactionalId $transactionalId append transaction log for $newMetadata transition failed due to $responseError, " + + s"aborting state transition and returning the error in the callback since metadata is not available in the cache anymore") + case Left(error) => // Do nothing here, since we want to return the original append error to the user. - info(s"Retrieving metadata for transactionalId $transactionalId returned $error after append to the log returned error $responseError") + info(s"TransactionalId $transactionalId append transaction log for $newMetadata transition failed due to $responseError, " + + s"aborting state transition and returning the error in the callback since retrieving metadata returned $error") } } @@ -601,7 +614,7 @@ class TransactionStateManager(brokerId: Int, updateCacheCallback, delayedProduceLock = Some(newMetadata)) - trace(s"Appended new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log") + trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log") } } } @@ -636,6 +649,7 @@ private[transaction] case class TransactionConfig(transactionalIdExpirationMs: I case class TransactionalIdAndProducerIdEpoch(transactionalId: String, producerId: Long, producerEpoch: Short) case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int, coordinatorEpoch: Int) -case class TransactionalIdCoordinatorEpochAndMetadata(transactionalId: String, - coordinatorEpoch: Int, - transitMetadata: TxnTransitMetadata) + +case class TransactionalIdCoordinatorEpochAndMetadata(transactionalId: String, coordinatorEpoch: Int, transitMetadata: TxnTransitMetadata) + +case class TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId: String, coordinatorEpoch: Int, result: TransactionResult, txnMetadata: TransactionMetadata, transitMetadata: TxnTransitMetadata) http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/utils/ReplicationUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 29e5d10..c0cb5aa 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -32,7 +32,7 @@ object ReplicationUtils extends Logging { def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int, zkVersion: Int): (Boolean,Int) = { - debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(","))) + debug(s"Updated ISR for $topic-$partitionId to ${newLeaderAndIsr.isr.mkString(",")}") val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId) val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch @@ -44,10 +44,10 @@ object ReplicationUtils extends Logging { val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath( ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix, generateIsrChangeJson(isrChangeSet)) - debug("Added " + isrChangeNotificationPath + " for " + isrChangeSet) + debug(s"Added $isrChangeNotificationPath for $isrChangeSet") } - def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = { + private def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = { try { val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path) val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 @@ -67,12 +67,13 @@ object ReplicationUtils extends Logging { } catch { case _: Exception => } - (false,-1) + (false, -1) } - def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { + def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = { val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) val (leaderAndIsrOpt, stat) = zkUtils.readDataMaybeNull(leaderAndIsrPath) + debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition") leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat)) } @@ -85,8 +86,7 @@ object ReplicationUtils extends Logging { val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch, - isr.toString(), zkPathVersion, path)) + trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path") Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))} } http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index e67ed08..49b1252 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -120,7 +120,8 @@ class TransactionCoordinatorTest { EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.anyObject().asInstanceOf[TxnTransitMetadata], - EasyMock.capture(capturedErrorsCallback))) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { capturedErrorsCallback.getValue.apply(Errors.NONE) @@ -147,7 +148,8 @@ class TransactionCoordinatorTest { EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.anyObject().asInstanceOf[TxnTransitMetadata], - EasyMock.capture(capturedErrorsCallback) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject() )).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { capturedErrorsCallback.getValue.apply(Errors.NONE) @@ -293,7 +295,8 @@ class TransactionCoordinatorTest { EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.anyObject().asInstanceOf[TxnTransitMetadata], - EasyMock.capture(capturedErrorsCallback) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject() )) EasyMock.replay(transactionManager) @@ -523,7 +526,8 @@ class TransactionCoordinatorTest { EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.eq(originalMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds())), - EasyMock.capture(capturedErrorsCallback))) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { capturedErrorsCallback.getValue.apply(Errors.NONE) @@ -566,7 +570,8 @@ class TransactionCoordinatorTest { topicPartitions = partitions.toSet, txnStartTimestamp = time.milliseconds(), txnLastUpdateTimestamp = time.milliseconds())), - EasyMock.capture(capturedErrorsCallback))) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { capturedErrorsCallback.getValue.apply(Errors.NONE) @@ -612,7 +617,8 @@ class TransactionCoordinatorTest { EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.eq(expectedTransition), - EasyMock.capture(capturedErrorsCallback))) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = {} }) @@ -677,7 +683,8 @@ class TransactionCoordinatorTest { EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.capture(capturedNewMetadata), - EasyMock.capture(capturedErrorsCallback) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject() )).andAnswer(new IAnswer[Unit] { override def answer(): Unit = { metadata.completeTransitionTo(capturedNewMetadata.getValue) @@ -712,7 +719,8 @@ class TransactionCoordinatorTest { EasyMock.eq(transactionalId), EasyMock.eq(coordinatorEpoch), EasyMock.eq(transition), - EasyMock.capture(capturedErrorsCallback))) + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = { if (runCallback) http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 5e328ae..e1d5a6b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -19,15 +19,16 @@ package kafka.coordinator.transaction import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.timer.MockTimer import kafka.utils.TestUtils -import org.apache.kafka.clients.NetworkClient -import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest} +import org.apache.kafka.clients.{ClientResponse, NetworkClient} +import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.{MockTime, Utils} import org.apache.kafka.common.{Node, TopicPartition} -import org.easymock.EasyMock +import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.Assert._ import org.junit.Test - import com.yammer.metrics.Metrics +import kafka.common.RequestAndCompletionHandler +import org.apache.kafka.common.protocol.Errors import scala.collection.JavaConverters._ import scala.collection.mutable @@ -57,6 +58,8 @@ class TransactionMarkerChannelManagerTest { private val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch, txnTimeoutMs, PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L) + private val capturedErrorsCallback: Capture[Errors => Unit] = EasyMock.newCapture() + private val txnMarkerPurgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("txn-purgatory-name", new MockTimer, reaperEnabled = false) @@ -86,7 +89,6 @@ class TransactionMarkerChannelManagerTest { .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) .anyTimes() - EasyMock.replay(txnStateManager) } @Test @@ -97,6 +99,7 @@ class TransactionMarkerChannelManagerTest { @Test def shouldGenerateRequestPerPartitionPerBroker(): Unit = { mockCache() + EasyMock.replay(txnStateManager) EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( EasyMock.eq(partition1.topic), @@ -139,6 +142,7 @@ class TransactionMarkerChannelManagerTest { @Test def shouldSkipSendMarkersWhenLeaderNotFound(): Unit = { mockCache() + EasyMock.replay(txnStateManager) EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( EasyMock.eq(partition1.topic), @@ -166,6 +170,7 @@ class TransactionMarkerChannelManagerTest { @Test def shouldSaveForLaterWhenLeaderUnknownButNotAvailable(): Unit = { mockCache() + EasyMock.replay(txnStateManager) EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( EasyMock.eq(partition1.topic), @@ -219,6 +224,7 @@ class TransactionMarkerChannelManagerTest { @Test def shouldRemoveMarkersForTxnPartitionWhenPartitionEmigrated(): Unit = { mockCache() + EasyMock.replay(txnStateManager) EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( EasyMock.eq(partition1.topic), @@ -256,6 +262,167 @@ class TransactionMarkerChannelManagerTest { } @Test + def shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed(): Unit = { + mockCache() + + EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( + EasyMock.eq(partition1.topic), + EasyMock.eq(partition1.partition), + EasyMock.anyObject()) + ).andReturn(Some(broker1)).anyTimes() + EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( + EasyMock.eq(partition2.topic), + EasyMock.eq(partition2.partition), + EasyMock.anyObject()) + ).andReturn(Some(broker2)).anyTimes() + + val txnTransitionMetadata2 = txnMetadata2.prepareComplete(time.milliseconds()) + + EasyMock.expect(txnStateManager.appendTransactionToLog( + EasyMock.eq(transactionalId2), + EasyMock.eq(coordinatorEpoch), + EasyMock.eq(txnTransitionMetadata2), + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + txnMetadata2.completeTransitionTo(txnTransitionMetadata2) + capturedErrorsCallback.getValue.apply(Errors.NONE) + } + }).once() + EasyMock.replay(txnStateManager, metadataCache) + + channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2) + + val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests() + + val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) + for (requestAndHandler <- requestAndHandlers) { + requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + } + + EasyMock.verify(txnStateManager) + + assertEquals(0, txnMarkerPurgatory.watched) + assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers) + assertEquals(None, txnMetadata2.pendingState) + assertEquals(CompleteCommit, txnMetadata2.state) + } + + @Test + def shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError(): Unit = { + mockCache() + + EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( + EasyMock.eq(partition1.topic), + EasyMock.eq(partition1.partition), + EasyMock.anyObject()) + ).andReturn(Some(broker1)).anyTimes() + EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( + EasyMock.eq(partition2.topic), + EasyMock.eq(partition2.partition), + EasyMock.anyObject()) + ).andReturn(Some(broker2)).anyTimes() + + val txnTransitionMetadata2 = txnMetadata2.prepareComplete(time.milliseconds()) + + EasyMock.expect(txnStateManager.appendTransactionToLog( + EasyMock.eq(transactionalId2), + EasyMock.eq(coordinatorEpoch), + EasyMock.eq(txnTransitionMetadata2), + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + txnMetadata2.pendingState = None + capturedErrorsCallback.getValue.apply(Errors.NOT_COORDINATOR) + } + }).once() + EasyMock.replay(txnStateManager, metadataCache) + + channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2) + + val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests() + + val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) + for (requestAndHandler <- requestAndHandlers) { + requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + } + + EasyMock.verify(txnStateManager) + + assertEquals(0, txnMarkerPurgatory.watched) + assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers) + assertEquals(None, txnMetadata2.pendingState) + assertEquals(PrepareCommit, txnMetadata2.state) + } + + @Test + def shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError(): Unit = { + mockCache() + + EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( + EasyMock.eq(partition1.topic), + EasyMock.eq(partition1.partition), + EasyMock.anyObject()) + ).andReturn(Some(broker1)).anyTimes() + EasyMock.expect(metadataCache.getPartitionLeaderEndpoint( + EasyMock.eq(partition2.topic), + EasyMock.eq(partition2.partition), + EasyMock.anyObject()) + ).andReturn(Some(broker2)).anyTimes() + + val txnTransitionMetadata2 = txnMetadata2.prepareComplete(time.milliseconds()) + + EasyMock.expect(txnStateManager.appendTransactionToLog( + EasyMock.eq(transactionalId2), + EasyMock.eq(coordinatorEpoch), + EasyMock.eq(txnTransitionMetadata2), + EasyMock.capture(capturedErrorsCallback), + EasyMock.anyObject())) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + capturedErrorsCallback.getValue.apply(Errors.COORDINATOR_NOT_AVAILABLE) + } + }) + .andAnswer(new IAnswer[Unit] { + override def answer(): Unit = { + txnMetadata2.completeTransitionTo(txnTransitionMetadata2) + capturedErrorsCallback.getValue.apply(Errors.NONE) + } + }) + + EasyMock.replay(txnStateManager, metadataCache) + + channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2) + + val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests() + + val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) + for (requestAndHandler <- requestAndHandlers) { + requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response)) + } + + // call this again so that append log will be retried + senderThread.generateRequests() + + EasyMock.verify(txnStateManager) + + assertEquals(0, txnMarkerPurgatory.watched) + assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers) + assertEquals(None, txnMetadata2.pendingState) + assertEquals(CompleteCommit, txnMetadata2.state) + } + + private def createPidErrorMap(errors: Errors) = { + val pidMap = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]() + val errorsMap = new java.util.HashMap[TopicPartition, Errors]() + errorsMap.put(partition1, errors) + pidMap.put(producerId2, errorsMap) + pidMap + } + + @Test def shouldCreateMetricsOnStarting(): Unit = { val metrics = Metrics.defaultRegistry.allMetrics http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index b5d7903..032a967 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -208,7 +208,7 @@ class TransactionStateManagerTest { } @Test - def testAppendTransactionToLog() { + def testCompleteTransitionWhenAppendSucceeded(): Unit = { transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) // first insert the initial transaction metadata @@ -226,50 +226,98 @@ class TransactionStateManagerTest { assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) + } - // append to log again with expected failures - txnMetadata1.pendingState = None - val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + @Test + def testAppendFailToCoordinatorNotAvailableError(): Unit = { + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) - // test COORDINATOR_NOT_AVAILABLE cases expectedError = Errors.COORDINATOR_NOT_AVAILABLE + var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) + } + + @Test + def testAppendFailToNotCoordinatorError(): Unit = { + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) - // test NOT_COORDINATOR cases expectedError = Errors.NOT_COORDINATOR + var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) - // test Unknown cases + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + prepareForTxnMessageAppend(Errors.NONE) + transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) + transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) + + prepareForTxnMessageAppend(Errors.NONE) + transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch + 1, new Pool[String, TransactionMetadata]()) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) + transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) + + prepareForTxnMessageAppend(Errors.NONE) + transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) + transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) + } + + @Test + def testAppendFailToCoordinatorLoadingError(): Unit = { + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) + + expectedError = Errors.COORDINATOR_LOAD_IN_PROGRESS + val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + + prepareForTxnMessageAppend(Errors.NONE) + transactionManager.removeTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch) + transactionManager.addLoadingPartition(partitionId, coordinatorEpoch + 1) + transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) + } + + @Test + def testAppendFailToUnknownError() { + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) + expectedError = Errors.UNKNOWN + var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) assertTrue(txnMetadata1.pendingState.isEmpty) + failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) @@ -277,6 +325,20 @@ class TransactionStateManagerTest { } @Test + def testPendingStateNotResetOnRetryAppend() { + transactionManager.addLoadedTransactionsToCache(partitionId, coordinatorEpoch, new Pool[String, TransactionMetadata]()) + transactionManager.putTransactionStateIfNotExists(transactionalId1, txnMetadata1) + + expectedError = Errors.COORDINATOR_NOT_AVAILABLE + val failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds()) + + prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) + transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true) + assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1)) + assertEquals(Some(Ongoing), txnMetadata1.pendingState) + } + + @Test def testAppendTransactionToLogWhileProducerFenced() = { transactionManager.addLoadedTransactionsToCache(partitionId, 0, new Pool[String, TransactionMetadata]())