Repository: kafka Updated Branches: refs/heads/trunk 1d24e10ae -> cb2fdbd6c
MINOR: Add some logging for the transaction coordinator Author: Apurva Mehta <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson <[email protected]> Closes #3278 from apurvam/MINOR-add-logging-to-transaction-coordinator-in-all-failure-cases Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb2fdbd6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb2fdbd6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb2fdbd6 Branch: refs/heads/trunk Commit: cb2fdbd6c1621fd578c4c1422ead62faf7ae3f7f Parents: 1d24e10 Author: Apurva Mehta <[email protected]> Authored: Tue Jun 20 11:31:59 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Jun 20 11:31:59 2017 -0700 ---------------------------------------------------------------------- .../kafka/coordinator/transaction/TransactionCoordinator.scala | 3 +-- .../transaction/TransactionMarkerRequestCompletionHandler.scala | 2 +- .../kafka/coordinator/transaction/TransactionStateManager.scala | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cb2fdbd6/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 51424f8..54e6923 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -400,7 +400,6 @@ class TransactionCoordinator(brokerId: Int, preSendResult match { case Left(err) => info(s"Aborting sending of transaction markers after appended $txnMarkerResult to transaction log and returning $err error to client for $transactionalId's EndTransaction request") - responseCallback(err) case Right((txnMetadata, newPreSendMetadata)) => @@ -435,7 +434,7 @@ class TransactionCoordinator(brokerId: Int, TransactionResult.ABORT, (error: Errors) => error match { case Errors.NONE => - debug(s"Completed rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout") + info(s"Completed rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} due to timeout") case Errors.INVALID_PRODUCER_ID_MAPPING | Errors.INVALID_PRODUCER_EPOCH | Errors.CONCURRENT_TRANSACTIONS => http://git-wip-us.apache.org/repos/asf/kafka/blob/cb2fdbd6/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 4abaada..54960b9 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -182,7 +182,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, if (!abortSending) { if (retryPartitions.nonEmpty) { - trace(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " + + debug(s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " + s"under coordinator epoch ${txnMarker.coordinatorEpoch}") // re-enqueue with possible new leaders of the partitions http://git-wip-us.apache.org/repos/asf/kafka/blob/cb2fdbd6/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index c0e4a77..e0d5076 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -506,7 +506,6 @@ class TransactionStateManager(brokerId: Int, info(s"Accessing the cached transaction metadata for $transactionalId returns $err error; " + s"aborting transition to the new metadata and setting the error in the callback") responseError = err - case Right(Some(epochAndMetadata)) => val metadata = epochAndMetadata.transactionMetadata
