Repository: kafka
Updated Branches:
  refs/heads/0.11.0 26c6a0715 -> f6bcb8472


KAFKA-5416; TC should not reset pending state if log append is retried

In `TransationStateManager`, we reset the pending state if an error occurred 
while appending to log; this is correct except that for the 
`TransactionMarkerChannelManager`, as it will retry appending to log and if 
eventually it succeeded, the transaction metadata's completing transition will 
throw an IllegalStateException since pending state is None, this will be thrown 
all the way to the `KafkaApis` and be swallowed.

1. Do not reset the pending state if the append will be retried (as is the case 
when write the complete transition).
2. A bunch of log4j improvements based the debugging experience. The main 
principle is to make sure all error codes that is about to sent to the client 
will be logged, and unnecessary log4j entries to be removed.
3. Also moved some log entries in ReplicationUtils.scala to `trace`: this is 
rather orthogonal to this PR but I found it rather annoying while debugging the 
logs.
4. A couple of unrelated bug fixes as pointed by hachikuji and apurvam.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Apurva Mehta <apu...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #3287 from guozhangwang/KHotfix-transaction-coordinator-append-callback

(cherry picked from commit 9d6f0f40ceb4859d54454e03f6abfe8b67963e83)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6bcb847
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6bcb847
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6bcb847

Branch: refs/heads/0.11.0
Commit: f6bcb84727ea8710392ce3d0960a6b511dc3d907
Parents: 26c6a07
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Mon Jun 12 12:47:42 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jun 12 12:50:16 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    |  43 +++--
 .../TransactionMarkerChannelManager.scala       |  59 +++++--
 ...nsactionMarkerRequestCompletionHandler.scala |   3 +
 .../transaction/TransactionMetadata.scala       |  26 +--
 .../transaction/TransactionStateManager.scala   | 112 +++++++-----
 .../scala/kafka/utils/ReplicationUtils.scala    |  14 +-
 .../TransactionCoordinatorTest.scala            |  24 ++-
 .../TransactionMarkerChannelManagerTest.scala   | 177 ++++++++++++++++++-
 .../TransactionStateManagerTest.scala           |  76 +++++++-
 9 files changed, 419 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 040ab38..7058de6 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -138,7 +138,9 @@ class TransactionCoordinator(brokerId: Int,
           }
 
         case Right(None) =>
-          throw new IllegalStateException("Trying to add metadata to the cache 
still returns NONE; this is not expected")
+          val errorMsg = "Trying to add metadata to the cache still returns 
NONE; this is not expected"
+          fatal(errorMsg)
+          throw new IllegalStateException(errorMsg)
       }
 
       result match {
@@ -169,6 +171,7 @@ class TransactionCoordinator(brokerId: Int,
                   
s"${Topic.TRANSACTION_STATE_TOPIC_NAME}-${txnManager.partitionFor(transactionalId)}")
                 responseCallback(initTransactionMetadata(newMetadata))
               } else {
+                info(s"Returning $error error code to client for 
$transactionalId's InitProducerId request")
                 responseCallback(initTransactionError(error))
               }
             }
@@ -211,8 +214,11 @@ class TransactionCoordinator(brokerId: Int,
           // then when the client retries, we will generate a new producerId.
           Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
         case Dead =>
-          throw new IllegalStateException(s"Found transactionalId 
$transactionalId with state ${txnMetadata.state}. " +
-            s"This is illegal as we should never have transitioned to this 
state.")
+          val errorMsg = s"Found transactionalId $transactionalId with state 
${txnMetadata.state}. " +
+            s"This is illegal as we should never have transitioned to this 
state."
+          fatal(errorMsg)
+          throw new IllegalStateException(errorMsg)
+
       }
     }
   }
@@ -223,6 +229,7 @@ class TransactionCoordinator(brokerId: Int,
                                        partitions: 
collection.Set[TopicPartition],
                                        responseCallback: 
AddPartitionsCallback): Unit = {
     if (transactionalId == null || transactionalId.isEmpty) {
+      debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for 
$transactionalId's AddPartitions request")
       responseCallback(Errors.INVALID_REQUEST)
     } else {
       // try to update the transaction metadata and append the updated 
metadata to txn log;
@@ -260,6 +267,7 @@ class TransactionCoordinator(brokerId: Int,
 
       result match {
         case Left(err) =>
+          info(s"Returning $err error code to client for $transactionalId's 
AddPartitions request")
           responseCallback(err)
 
         case Right((coordinatorEpoch, newMetadata)) =>
@@ -341,8 +349,10 @@ class TransactionCoordinator(brokerId: Int,
               case Empty =>
                 logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case Dead =>
-                throw new IllegalStateException(s"Found transactionalId 
$transactionalId with state ${txnMetadata.state}. " +
-                  s"This is illegal as we should never have transitioned to 
this state.")
+                val errorMsg = s"Found transactionalId $transactionalId with 
state ${txnMetadata.state}. " +
+                  s"This is illegal as we should never have transitioned to 
this state."
+                fatal(errorMsg)
+                throw new IllegalStateException(errorMsg)
 
             }
           }
@@ -350,6 +360,7 @@ class TransactionCoordinator(brokerId: Int,
 
       preAppendResult match {
         case Left(err) =>
+          info(s"Aborting append of $txnMarkerResult to transaction log with 
coordinator and returning $err error to client for $transactionalId's 
EndTransaction request")
           responseCallback(err)
 
         case Right((coordinatorEpoch, newMetadata)) =>
@@ -383,25 +394,31 @@ class TransactionCoordinator(brokerId: Int,
                           else
                             Right(txnMetadata, 
txnMetadata.prepareComplete(time.milliseconds()))
                         case Dead =>
-                          throw new IllegalStateException(s"Found 
transactionalId $transactionalId with state ${txnMetadata.state}. " +
-                            s"This is illegal as we should never have 
transitioned to this state.")
+                          val errorMsg = s"Found transactionalId 
$transactionalId with state ${txnMetadata.state}. " +
+                            s"This is illegal as we should never have 
transitioned to this state."
+                          fatal(errorMsg)
+                          throw new IllegalStateException(errorMsg)
 
                       }
                     }
                   } else {
-                    info(s"Updating $transactionalId's transaction state to 
$newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
failed since the transaction coordinator epoch " +
-                      s"has been changed to 
${epochAndMetadata.coordinatorEpoch} after the transaction metadata has been 
successfully appended to the log")
+                    debug(s"The transaction coordinator epoch has changed to 
${epochAndMetadata.coordinatorEpoch} after $txnMarkerResult was " +
+                      s"successfully appended to the log for $transactionalId 
with old epoch $coordinatorEpoch")
 
                     Left(Errors.NOT_COORDINATOR)
                   }
 
                 case Right(None) =>
-                  throw new IllegalStateException(s"The coordinator still owns 
the transaction partition for $transactionalId, but there is " +
-                    s"no metadata in the cache; this is not expected")
+                  val errorMsg = s"The coordinator still owns the transaction 
partition for $transactionalId, but there is " +
+                    s"no metadata in the cache; this is not expected"
+                  fatal(errorMsg)
+                  throw new IllegalStateException(errorMsg)
               }
 
               preSendResult match {
                 case Left(err) =>
+                  info(s"Aborting sending of transaction markers after 
appended $txnMarkerResult to transaction log and returning $err error to client 
for $transactionalId's EndTransaction request")
+
                   responseCallback(err)
 
                 case Right((txnMetadata, newPreSendMetadata)) =>
@@ -412,8 +429,8 @@ class TransactionCoordinator(brokerId: Int,
                   txnMarkerChannelManager.addTxnMarkersToSend(transactionalId, 
coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
               }
             } else {
-              info(s"Updating $transactionalId's transaction state to 
$newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
failed since the transaction message " +
-                s"cannot be appended to the log. Returning error code $error 
to the client")
+              info(s"Aborting sending of transaction markers and returning 
$error error to client for $transactionalId's EndTransaction request of 
$txnMarkerResult, " +
+                s"since appending $newMetadata to transaction log with 
coordinator epoch $coordinatorEpoch failed")
 
               responseCallback(error)
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 63a382f..7b923e6 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -130,6 +130,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                                       txnMarkerPurgatory: 
DelayedOperationPurgatory[DelayedTxnMarker],
                                       time: Time) extends Logging with 
KafkaMetricsGroup {
 
+  this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + 
"]: "
+
   private val interBrokerListenerName: ListenerName = 
config.interBrokerListenerName
 
   private val txnMarkerSendThread: InterBrokerSendThread = {
@@ -197,8 +199,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   def retryLogAppends(): Unit = {
     val txnLogAppendRetries: java.util.List[TxnLogAppend] = new 
util.ArrayList[TxnLogAppend]()
     txnLogAppendRetryQueue.drainTo(txnLogAppendRetries)
-    debug(s"retrying: ${txnLogAppendRetries.size} transaction log appends")
     txnLogAppendRetries.asScala.foreach { txnLogAppend =>
+      debug(s"Retry appending $txnLogAppend transaction log")
       tryAppendToLog(txnLogAppend)
     }
   }
@@ -254,25 +256,29 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
             case Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) =>
               info(s"I am loading the transaction partition that contains 
$transactionalId while my current coordinator epoch is $coordinatorEpoch; " +
-                s"so appending $newMetadata to transaction log since the 
loading process will continue the left work")
+                s"so cancel appending $newMetadata to transaction log since 
the loading process will continue the remaining work")
 
             case Right(Some(epochAndMetadata)) =>
               if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
-                debug(s"Updating $transactionalId's transaction state to 
$txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
succeeded")
+                debug(s"Sending $transactionalId's transaction markers for 
$txnMetadata with coordinator epoch $coordinatorEpoch succeeded, trying to 
append complete transaction log now")
 
                 tryAppendToLog(TxnLogAppend(transactionalId, coordinatorEpoch, 
txnMetadata, newMetadata))
               } else {
-                info(s"Updating $transactionalId's transaction state to 
$txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
failed after the transaction markers " +
-                  s"has been sent to brokers. The cached metadata have been 
changed to $epochAndMetadata since preparing to send markers")
+                info(s"The cached metadata $txnMetadata has changed to 
$epochAndMetadata after completed sending the markers with coordinator " +
+                  s"epoch $coordinatorEpoch; abort transiting the metadata to 
$newMetadata as it may have been updated by another process")
               }
 
             case Right(None) =>
-              throw new IllegalStateException(s"The coordinator still owns the 
transaction partition for $transactionalId, but there is " +
-                s"no metadata in the cache; this is not expected")
+              val errorMsg = s"The coordinator still owns the transaction 
partition for $transactionalId, but there is " +
+                s"no metadata in the cache; this is not expected"
+              fatal(errorMsg)
+              throw new IllegalStateException(errorMsg)
           }
 
         case other =>
-          throw new IllegalStateException(s"Unexpected error 
${other.exceptionName} before appending to txn log for $transactionalId")
+          val errorMsg = s"Unexpected error ${other.exceptionName} before 
appending to txn log for $transactionalId"
+          fatal(errorMsg)
+          throw new IllegalStateException(errorMsg)
       }
     }
 
@@ -287,21 +293,30 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     def appendCallback(error: Errors): Unit =
       error match {
         case Errors.NONE =>
-          trace(s"Completed transaction for ${txnLogAppend.transactionalId} 
with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state: state 
after commit: ${txnLogAppend.txnMetadata.state}")
+          trace(s"Completed transaction for ${txnLogAppend.transactionalId} 
with coordinator epoch ${txnLogAppend.coordinatorEpoch}, final state after 
commit: ${txnLogAppend.txnMetadata.state}")
 
         case Errors.NOT_COORDINATOR =>
           info(s"No longer the coordinator for transactionalId: 
${txnLogAppend.transactionalId} while trying to append to transaction log, skip 
writing to transaction log")
 
         case Errors.COORDINATOR_NOT_AVAILABLE =>
-          warn(s"Failed updating transaction state for 
${txnLogAppend.transactionalId} when appending to transaction log due to 
${error.exceptionName}. retrying")
+          info(s"Not available to append $txnLogAppend: possible causes 
include ${Errors.UNKNOWN_TOPIC_OR_PARTITION}, ${Errors.NOT_ENOUGH_REPLICAS}, " +
+            s"${Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND} and 
${Errors.REQUEST_TIMED_OUT}; retry appending")
+
           // enqueue for retry
           txnLogAppendRetryQueue.add(txnLogAppend)
 
-        case errors: Errors =>
-          throw new IllegalStateException(s"Unexpected error 
${errors.exceptionName} while appending to transaction log for 
${txnLogAppend.transactionalId}")
+        case Errors.COORDINATOR_LOAD_IN_PROGRESS =>
+          info(s"Coordinator is loading the partition 
${txnStateManager.partitionFor(txnLogAppend.transactionalId)} and hence cannot 
complete append of $txnLogAppend; " +
+            s"skip writing to transaction log as the loading process should 
complete it")
+
+        case other: Errors =>
+          val errorMsg = s"Unexpected error ${other.exceptionName} while 
appending to transaction log for ${txnLogAppend.transactionalId}"
+          fatal(errorMsg)
+          throw new IllegalStateException(errorMsg)
       }
 
-    txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, 
txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback)
+    txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, 
txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, appendCallback,
+      _ == Errors.COORDINATOR_NOT_AVAILABLE)
   }
 
   def addTxnMarkersToBrokerQueue(transactionalId: String, producerId: Long, 
producerEpoch: Short,
@@ -352,8 +367,11 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
               }
 
             case Right(None) =>
-              throw new IllegalStateException(s"The coordinator still owns the 
transaction partition for $transactionalId, but there is " +
-                s"no metadata in the cache; this is not expected")
+              val errorMsg = s"The coordinator still owns the transaction 
partition for $transactionalId, but there is " +
+                s"no metadata in the cache; this is not expected"
+              fatal(errorMsg)
+              throw new IllegalStateException(errorMsg)
+
           }
       }
     }
@@ -388,4 +406,13 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
 case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry)
 
-case class TxnLogAppend(transactionalId: String, coordinatorEpoch: Int, 
txnMetadata: TransactionMetadata, newMetadata: TxnTransitMetadata)
+case class TxnLogAppend(transactionalId: String, coordinatorEpoch: Int, 
txnMetadata: TransactionMetadata, newMetadata: TxnTransitMetadata) {
+
+  override def toString: String = {
+    "TxnLogAppend(" +
+      s"transactionalId=$transactionalId, " +
+      s"coordinatorEpoch=$coordinatorEpoch, " +
+      s"txnMetadata=$txnMetadata, " +
+      s"newMetadata=$newMetadata)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 68edc65..bfc7da2 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -30,6 +30,9 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                                                 txnStateManager: 
TransactionStateManager,
                                                 txnMarkerChannelManager: 
TransactionMarkerChannelManager,
                                                 txnIdAndMarkerEntries: 
java.util.List[TxnIdAndMarkerEntry]) extends RequestCompletionHandler with 
Logging {
+
+  this.logIdent = "[Transaction Marker Request Completion Handler " + brokerId 
+ "]: "
+
   override def onComplete(response: ClientResponse): Unit = {
     val requestHeader = response.requestHeader
     val correlationId = requestHeader.correlationId

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 7e9add3..405e639 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -282,11 +282,15 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
     //
     // if valid, transition is done via overwriting the whole object to ensure 
synchronization
 
-    val toState = pendingState.getOrElse(throw new 
IllegalStateException(s"TransactionalId $transactionalId " +
-      "completing transaction state transition while it does not have a 
pending state"))
+    val toState = pendingState.getOrElse {
+      fatal(s"$this's transition to $transitMetadata failed since pendingState 
is not defined: this should not happen")
+
+      throw new IllegalStateException(s"TransactionalId $transactionalId " +
+        "completing transaction state transition while it does not have a 
pending state")
+    }
 
     if (toState != transitMetadata.txnState) {
-      throwStateTransitionFailure(toState)
+      throwStateTransitionFailure(transitMetadata)
     } else {
       toState match {
         case Empty => // from initPid
@@ -294,7 +298,7 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
             transitMetadata.topicPartitions.nonEmpty ||
             transitMetadata.txnStartTimestamp != -1) {
 
-            throwStateTransitionFailure(toState)
+            throwStateTransitionFailure(transitMetadata)
           } else {
             txnTimeoutMs = transitMetadata.txnTimeoutMs
             producerEpoch = transitMetadata.producerEpoch
@@ -307,7 +311,7 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
             txnTimeoutMs != transitMetadata.txnTimeoutMs ||
             txnStartTimestamp > transitMetadata.txnStartTimestamp) {
 
-            throwStateTransitionFailure(toState)
+            throwStateTransitionFailure(transitMetadata)
           } else {
             txnStartTimestamp = transitMetadata.txnStartTimestamp
             addPartitions(transitMetadata.topicPartitions)
@@ -319,7 +323,7 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
             txnTimeoutMs != transitMetadata.txnTimeoutMs ||
             txnStartTimestamp != transitMetadata.txnStartTimestamp) {
 
-            throwStateTransitionFailure(toState)
+            throwStateTransitionFailure(transitMetadata)
           }
 
         case CompleteAbort | CompleteCommit => // from write markers
@@ -327,7 +331,7 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
             txnTimeoutMs != transitMetadata.txnTimeoutMs ||
             transitMetadata.txnStartTimestamp == -1) {
 
-            throwStateTransitionFailure(toState)
+            throwStateTransitionFailure(transitMetadata)
           } else {
             txnStartTimestamp = transitMetadata.txnStartTimestamp
             topicPartitions.clear()
@@ -360,14 +364,16 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
     transitEpoch == producerEpoch + 1 || (transitEpoch == 0 && 
transitProducerId != producerId)
   }
 
-  private def throwStateTransitionFailure(toState: TransactionState): Unit = {
-    throw new IllegalStateException(s"TransactionalId $transactionalId failed 
transition to state $toState " +
+  private def throwStateTransitionFailure(txnTransitMetadata: 
TxnTransitMetadata): Unit = {
+    fatal(s"${this.toString}'s transition to $txnTransitMetadata failed: this 
should not happen")
+
+    throw new IllegalStateException(s"TransactionalId $transactionalId failed 
transition to state $txnTransitMetadata " +
       "due to unexpected metadata")
   }
 
   def pendingTransitionInProgress: Boolean = pendingState.isDefined
 
-  override def toString = {
+  override def toString: String = {
     "TransactionMetadata(" +
       s"transactionalId=$transactionalId, " +
       s"producerId=$producerId, " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index da3ba48..d5034b2 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -65,7 +65,7 @@ class TransactionStateManager(brokerId: Int,
                               config: TransactionConfig,
                               time: Time) extends Logging {
 
-  this.logIdent = "[Transaction Log Manager " + brokerId + "]: "
+  this.logIdent = "[Transaction State Manager " + brokerId + "]: "
 
   type SendTxnMarkersCallback = (String, Int, TransactionResult, 
TransactionMetadata, TxnTransitMetadata) => Unit
 
@@ -87,6 +87,16 @@ class TransactionStateManager(brokerId: Int,
   /** number of partitions for the transaction log topic */
   private val transactionTopicPartitionCount = 
getTransactionTopicPartitionCount
 
+  // visible for testing only
+  private[transaction] def addLoadingPartition(partitionId: Int, 
coordinatorEpoch: Int): Unit = {
+    val partitionAndLeaderEpoch = 
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
+
+    inWriteLock(stateLock) {
+      leavingPartitions.remove(partitionAndLeaderEpoch)
+      loadingPartitions.add(partitionAndLeaderEpoch)
+    }
+  }
+
   // this is best-effort expiration of an ongoing transaction which has been 
open for more than its
   // txn timeout value, we do not need to grab the lock on the metadata object 
upon checking its state
   // since the timestamp is volatile and we will get the lock when actually 
trying to transit the transaction
@@ -114,8 +124,6 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-
-
   def enableTransactionalIdExpiration() {
     scheduler.schedule("transactionalId-expiration", () => {
       val now = time.milliseconds()
@@ -218,7 +226,7 @@ class TransactionStateManager(brokerId: Int,
         return Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
 
       if (leavingPartitions.exists(_.txnPartitionId == partitionId))
-        Right(Errors.NOT_COORDINATOR)
+        return Left(Errors.NOT_COORDINATOR)
 
       transactionMetadataCache.get(partitionId) match {
         case Some(cacheEntry) =>
@@ -345,11 +353,11 @@ class TransactionStateManager(brokerId: Int,
     if (currentTxnMetadataCacheEntry.isDefined) {
       val coordinatorEpoch = currentTxnMetadataCacheEntry.get.coordinatorEpoch
       val metadataPerTxnId = 
currentTxnMetadataCacheEntry.get.metadataPerTransactionalId
-      info(s"The metadata cache for txn partition $txnTopicPartition has 
already exist with epoch $coordinatorEpoch " +
+      val errorMsg = s"The metadata cache for txn partition $txnTopicPartition 
has already exist with epoch $coordinatorEpoch " +
         s"and ${metadataPerTxnId.size} entries while trying to add to it; " +
-        s"it is likely that another process for loading from the transaction 
log has just executed earlier before")
-
-      throw new IllegalStateException(s"The metadata cache entry for txn 
partition $txnTopicPartition has already exist while trying to add to it.")
+        s"this should not happen"
+      fatal(errorMsg)
+      throw new IllegalStateException(errorMsg)
     }
   }
 
@@ -376,27 +384,32 @@ class TransactionStateManager(brokerId: Int,
         if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
           addLoadedTransactionsToCache(topicPartition.partition, 
coordinatorEpoch, loadedTransactions)
 
+          val transactionsPendingForCompletion = new 
mutable.ListBuffer[TransactionalIdCoordinatorEpochAndTransitMetadata]
           loadedTransactions.foreach {
             case (transactionalId, txnMetadata) =>
-              val result = txnMetadata synchronized {
+              txnMetadata synchronized {
                 // if state is PrepareCommit or PrepareAbort we need to 
complete the transaction
                 txnMetadata.state match {
                   case PrepareAbort =>
-                    Some(TransactionResult.ABORT, 
txnMetadata.prepareComplete(time.milliseconds()))
+                    transactionsPendingForCompletion +=
+                      
TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId, 
coordinatorEpoch, TransactionResult.ABORT, txnMetadata, 
txnMetadata.prepareComplete(time.milliseconds()))
                   case PrepareCommit =>
-                    Some(TransactionResult.COMMIT, 
txnMetadata.prepareComplete(time.milliseconds()))
+                    transactionsPendingForCompletion +=
+                      
TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId, 
coordinatorEpoch, TransactionResult.COMMIT, txnMetadata, 
txnMetadata.prepareComplete(time.milliseconds()))
                   case _ =>
                     // nothing need to be done
-                    None
                 }
               }
-
-              result.foreach { case (command, newMetadata) =>
-                sendTxnMarkers(transactionalId, coordinatorEpoch, command, 
txnMetadata, newMetadata)
-              }
           }
 
+          // we first remove the partition from loading partition then send 
out the markers for those pending to be
+          // completed transactions, so that when the markers get sent the 
attempt of appending the complete transaction
+          // log would not be blocked by the coordinator loading error
           loadingPartitions.remove(partitionAndLeaderEpoch)
+
+          transactionsPendingForCompletion.foreach { txnTransitMetadata =>
+            sendTxnMarkers(txnTransitMetadata.transactionalId, 
txnTransitMetadata.coordinatorEpoch, txnTransitMetadata.result, 
txnTransitMetadata.txnMetadata, txnTransitMetadata.transitMetadata)
+          }
         }
       }
     }
@@ -448,7 +461,8 @@ class TransactionStateManager(brokerId: Int,
   def appendTransactionToLog(transactionalId: String,
                              coordinatorEpoch: Int,
                              newMetadata: TxnTransitMetadata,
-                             responseCallback: Errors => Unit): Unit = {
+                             responseCallback: Errors => Unit,
+                             retryOnError: Errors => Boolean = _ => false): 
Unit = {
 
     // generate the message for this transaction metadata
     val keyBytes = TransactionLog.keyToBytes(transactionalId)
@@ -471,8 +485,7 @@ class TransactionStateManager(brokerId: Int,
       var responseError = if (status.error == Errors.NONE) {
         Errors.NONE
       } else {
-        debug(s"Transaction state update $newMetadata for $transactionalId 
failed when appending to log " +
-          s"due to ${status.error.exceptionName}")
+        debug(s"Appending $transactionalId's new metadata $newMetadata failed 
due to ${status.error.exceptionName}")
 
         // transform the log append error code to the corresponding 
coordinator error code
         status.error match {
@@ -480,31 +493,16 @@ class TransactionStateManager(brokerId: Int,
                | Errors.NOT_ENOUGH_REPLICAS
                | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
                | Errors.REQUEST_TIMED_OUT => // note that for timed out 
request we return NOT_AVAILABLE error code to let client retry
-
-            info(s"Appending transaction message $newMetadata for 
$transactionalId failed due to " +
-              s"${status.error.exceptionName}, returning 
${Errors.COORDINATOR_NOT_AVAILABLE} to the client")
-
             Errors.COORDINATOR_NOT_AVAILABLE
 
           case Errors.NOT_LEADER_FOR_PARTITION =>
-
-            info(s"Appending transaction message $newMetadata for 
$transactionalId failed due to " +
-              s"${status.error.exceptionName}, returning 
${Errors.NOT_COORDINATOR} to the client")
-
             Errors.NOT_COORDINATOR
 
           case Errors.MESSAGE_TOO_LARGE
                | Errors.RECORD_LIST_TOO_LARGE =>
-
-            error(s"Appending transaction message $newMetadata for 
$transactionalId failed due to " +
-              s"${status.error.exceptionName}, returning UNKNOWN error code to 
the client")
-
             Errors.UNKNOWN
 
           case other =>
-            error(s"Appending metadata message $newMetadata for 
$transactionalId failed due to " +
-              s"unexpected error: ${status.error.message}")
-
             other
         }
       }
@@ -515,7 +513,9 @@ class TransactionStateManager(brokerId: Int,
         getTransactionState(transactionalId) match {
 
           case Left(err) =>
-            responseCallback(err)
+            info(s"Accessing the cached transaction metadata for 
$transactionalId returns $err error; " +
+              s"aborting transition to the new metadata and setting the error 
in the callback")
+            responseError = err
 
           case Right(Some(epochAndMetadata)) =>
             val metadata = epochAndMetadata.transactionMetadata
@@ -524,8 +524,9 @@ class TransactionStateManager(brokerId: Int,
               if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
                 // the cache may have been changed due to txn topic partition 
emigration and immigration,
                 // in this case directly return NOT_COORDINATOR to client and 
let it to re-discover the transaction coordinator
-                info(s"Updating $transactionalId's transaction state to 
$newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
failed after the transaction message " +
-                  s"has been appended to the log. The cached coordinator epoch 
has changed to ${epochAndMetadata.coordinatorEpoch}")
+                info(s"The cached coordinator epoch for $transactionalId has 
changed to ${epochAndMetadata.coordinatorEpoch} after appended its new metadata 
$newMetadata " +
+                  s"to the transaction log (txn topic partition 
${partitionFor(transactionalId)}) while it was $coordinatorEpoch before 
appending; " +
+                  s"aborting transition to the new metadata and returning 
${Errors.NOT_COORDINATOR} in the callback")
                 responseError = Errors.NOT_COORDINATOR
               } else {
                 metadata.completeTransitionTo(newMetadata)
@@ -536,8 +537,9 @@ class TransactionStateManager(brokerId: Int,
           case Right(None) =>
             // this transactional id no longer exists, maybe the corresponding 
partition has already been migrated out.
             // return NOT_COORDINATOR to let the client re-discover the 
transaction coordinator
-            info(s"Updating $transactionalId's transaction state (txn topic 
partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator 
epoch $coordinatorEpoch for $transactionalId " +
-              s"failed after the transaction message has been appended to the 
log since the corresponding metadata does not exist in the cache anymore")
+            info(s"The cached coordinator metadata does not exist in the cache 
anymore for $transactionalId after appended its new metadata $newMetadata " +
+              s"to the transaction log (txn topic partition 
${partitionFor(transactionalId)}) while it was $coordinatorEpoch before 
appending; " +
+              s"aborting transition to the new metadata and returning 
${Errors.NOT_COORDINATOR} in the callback")
             responseError = Errors.NOT_COORDINATOR
         }
       } else {
@@ -547,19 +549,30 @@ class TransactionStateManager(brokerId: Int,
             val metadata = epochAndTxnMetadata.transactionMetadata
             metadata synchronized {
               if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
-                debug(s"TransactionalId ${metadata.transactionalId}, resetting 
pending state since we are returning error $responseError")
-                metadata.pendingState = None
+                if (retryOnError(responseError)) {
+                  info(s"TransactionalId ${metadata.transactionalId} append 
transaction log for $newMetadata transition failed due to $responseError, " +
+                    s"not resetting pending state ${metadata.pendingState} but 
just returning the error in the callback to let the caller retry")
+                } else {
+                  info(s"TransactionalId ${metadata.transactionalId} append 
transaction log for $newMetadata transition failed due to $responseError, " +
+                    s"resetting pending state from ${metadata.pendingState}, 
aborting state transition and returning $responseError in the callback")
+
+                  metadata.pendingState = None
+                }
               } else {
-                info(s"TransactionalId ${metadata.transactionalId} coordinator 
epoch changed from " +
-                  s"${epochAndTxnMetadata.coordinatorEpoch} to 
$coordinatorEpoch after append to log returned $responseError")
+                info(s"TransactionalId ${metadata.transactionalId} append 
transaction log for $newMetadata transition failed due to $responseError, " +
+                  s"aborting state transition and returning the error in the 
callback since the coordinator epoch has changed from 
${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch")
               }
             }
+
           case Right(None) =>
             // Do nothing here, since we want to return the original append 
error to the user.
-            info(s"Found no metadata TransactionalId $transactionalId after 
append to log returned error $responseError")
+            info(s"TransactionalId $transactionalId append transaction log for 
$newMetadata transition failed due to $responseError, " +
+              s"aborting state transition and returning the error in the 
callback since metadata is not available in the cache anymore")
+
           case Left(error) =>
             // Do nothing here, since we want to return the original append 
error to the user.
-            info(s"Retrieving metadata for transactionalId $transactionalId 
returned $error after append to the log returned error $responseError")
+            info(s"TransactionalId $transactionalId append transaction log for 
$newMetadata transition failed due to $responseError, " +
+              s"aborting state transition and returning the error in the 
callback since retrieving metadata returned $error")
         }
 
       }
@@ -601,7 +614,7 @@ class TransactionStateManager(brokerId: Int,
                 updateCacheCallback,
                 delayedProduceLock = Some(newMetadata))
 
-              trace(s"Appended new metadata $newMetadata for transaction id 
$transactionalId with coordinator epoch $coordinatorEpoch to the local 
transaction log")
+              trace(s"Appending new metadata $newMetadata for transaction id 
$transactionalId with coordinator epoch $coordinatorEpoch to the local 
transaction log")
             }
           }
       }
@@ -636,6 +649,7 @@ private[transaction] case class 
TransactionConfig(transactionalIdExpirationMs: I
 case class TransactionalIdAndProducerIdEpoch(transactionalId: String, 
producerId: Long, producerEpoch: Short)
 
 case class TransactionPartitionAndLeaderEpoch(txnPartitionId: Int, 
coordinatorEpoch: Int)
-case class TransactionalIdCoordinatorEpochAndMetadata(transactionalId: String,
-                                                      coordinatorEpoch: Int,
-                                                      transitMetadata: 
TxnTransitMetadata)
+
+case class TransactionalIdCoordinatorEpochAndMetadata(transactionalId: String, 
coordinatorEpoch: Int, transitMetadata: TxnTransitMetadata)
+
+case class TransactionalIdCoordinatorEpochAndTransitMetadata(transactionalId: 
String, coordinatorEpoch: Int, result: TransactionResult, txnMetadata: 
TransactionMetadata, transitMetadata: TxnTransitMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala 
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 29e5d10..c0cb5aa 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -32,7 +32,7 @@ object ReplicationUtils extends Logging {
 
   def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, 
newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
     zkVersion: Int): (Boolean,Int) = {
-    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, 
newLeaderAndIsr.isr.mkString(",")))
+    debug(s"Updated ISR for $topic-$partitionId to 
${newLeaderAndIsr.isr.mkString(",")}")
     val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
     val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, 
controllerEpoch)
     // use the epoch of the controller that made the leadership decision, 
instead of the current controller epoch
@@ -44,10 +44,10 @@ object ReplicationUtils extends Logging {
     val isrChangeNotificationPath: String = 
zkUtils.createSequentialPersistentPath(
       ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
       generateIsrChangeJson(isrChangeSet))
-    debug("Added " + isrChangeNotificationPath + " for " + isrChangeSet)
+    debug(s"Added $isrChangeNotificationPath for $isrChangeSet")
   }
 
-  def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, 
expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
+  private def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, 
expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
     try {
       val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path)
       val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
@@ -67,12 +67,13 @@ object ReplicationUtils extends Logging {
     } catch {
       case _: Exception =>
     }
-    (false,-1)
+    (false, -1)
   }
 
-  def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, 
partition: Int):Option[LeaderIsrAndControllerEpoch] = {
+  def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, 
partition: Int): Option[LeaderIsrAndControllerEpoch] = {
     val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
     val (leaderAndIsrOpt, stat) = zkUtils.readDataMaybeNull(leaderAndIsrPath)
+    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
     leaderAndIsrOpt.flatMap(leaderAndIsrStr => 
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
   }
 
@@ -85,8 +86,7 @@ object ReplicationUtils extends Logging {
       val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
       val controllerEpoch = 
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
       val zkPathVersion = stat.getVersion
-      debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for 
leaderAndIsrPath %s".format(leader, epoch,
-        isr.toString(), zkPathVersion, path))
+      trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version 
$zkPathVersion for leaderAndIsrPath $path")
       Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 
zkPathVersion), controllerEpoch))}
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index e67ed08..49b1252 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -120,7 +120,8 @@ class TransactionCoordinatorTest {
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
-      EasyMock.capture(capturedErrorsCallback)))
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
           capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -147,7 +148,8 @@ class TransactionCoordinatorTest {
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
-      EasyMock.capture(capturedErrorsCallback)
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()
     )).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -293,7 +295,8 @@ class TransactionCoordinatorTest {
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.anyObject().asInstanceOf[TxnTransitMetadata],
-      EasyMock.capture(capturedErrorsCallback)
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()
     ))
 
     EasyMock.replay(transactionManager)
@@ -523,7 +526,8 @@ class TransactionCoordinatorTest {
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(originalMetadata.prepareAbortOrCommit(PrepareAbort, 
time.milliseconds())),
-      EasyMock.capture(capturedErrorsCallback)))
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
           capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -566,7 +570,8 @@ class TransactionCoordinatorTest {
         topicPartitions = partitions.toSet,
         txnStartTimestamp = time.milliseconds(),
         txnLastUpdateTimestamp = time.milliseconds())),
-      EasyMock.capture(capturedErrorsCallback)))
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
           capturedErrorsCallback.getValue.apply(Errors.NONE)
@@ -612,7 +617,8 @@ class TransactionCoordinatorTest {
     
EasyMock.expect(transactionManager.appendTransactionToLog(EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(expectedTransition),
-      EasyMock.capture(capturedErrorsCallback)))
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {}
       })
@@ -677,7 +683,8 @@ class TransactionCoordinatorTest {
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.capture(capturedNewMetadata),
-      EasyMock.capture(capturedErrorsCallback)
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()
     )).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         metadata.completeTransitionTo(capturedNewMetadata.getValue)
@@ -712,7 +719,8 @@ class TransactionCoordinatorTest {
       EasyMock.eq(transactionalId),
       EasyMock.eq(coordinatorEpoch),
       EasyMock.eq(transition),
-      EasyMock.capture(capturedErrorsCallback)))
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
       .andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = {
           if (runCallback)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 5e328ae..e1d5a6b 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -19,15 +19,16 @@ package kafka.coordinator.transaction
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.timer.MockTimer
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.NetworkClient
-import org.apache.kafka.common.requests.{TransactionResult, 
WriteTxnMarkersRequest}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient}
+import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, 
WriteTxnMarkersRequest, WriteTxnMarkersResponse}
 import org.apache.kafka.common.utils.{MockTime, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.easymock.EasyMock
+import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
-
 import com.yammer.metrics.Metrics
+import kafka.common.RequestAndCompletionHandler
+import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -57,6 +58,8 @@ class TransactionMarkerChannelManagerTest {
   private val txnMetadata2 = new TransactionMetadata(transactionalId2, 
producerId2, producerEpoch, txnTimeoutMs,
     PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
 
+  private val capturedErrorsCallback: Capture[Errors => Unit] = 
EasyMock.newCapture()
+
   private val txnMarkerPurgatory = new 
DelayedOperationPurgatory[DelayedTxnMarker]("txn-purgatory-name",
     new MockTimer,
     reaperEnabled = false)
@@ -86,7 +89,6 @@ class TransactionMarkerChannelManagerTest {
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata2))))
       .anyTimes()
 
-    EasyMock.replay(txnStateManager)
   }
 
   @Test
@@ -97,6 +99,7 @@ class TransactionMarkerChannelManagerTest {
   @Test
   def shouldGenerateRequestPerPartitionPerBroker(): Unit = {
     mockCache()
+    EasyMock.replay(txnStateManager)
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
@@ -139,6 +142,7 @@ class TransactionMarkerChannelManagerTest {
   @Test
   def shouldSkipSendMarkersWhenLeaderNotFound(): Unit = {
     mockCache()
+    EasyMock.replay(txnStateManager)
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
@@ -166,6 +170,7 @@ class TransactionMarkerChannelManagerTest {
   @Test
   def shouldSaveForLaterWhenLeaderUnknownButNotAvailable(): Unit = {
     mockCache()
+    EasyMock.replay(txnStateManager)
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
@@ -219,6 +224,7 @@ class TransactionMarkerChannelManagerTest {
   @Test
   def shouldRemoveMarkersForTxnPartitionWhenPartitionEmigrated(): Unit = {
     mockCache()
+    EasyMock.replay(txnStateManager)
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
@@ -256,6 +262,167 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
+  def shouldCompleteAppendToLogOnEndTxnWhenSendMarkersSucceed(): Unit = {
+    mockCache()
+
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition1.topic),
+      EasyMock.eq(partition1.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
+
+    val txnTransitionMetadata2 = 
txnMetadata2.prepareComplete(time.milliseconds())
+
+    EasyMock.expect(txnStateManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId2),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(txnTransitionMetadata2),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          txnMetadata2.completeTransitionTo(txnTransitionMetadata2)
+          capturedErrorsCallback.getValue.apply(Errors.NONE)
+        }
+      }).once()
+    EasyMock.replay(txnStateManager, metadataCache)
+
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, 
txnResult, txnMetadata2, txnTransitionMetadata2)
+
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
senderThread.generateRequests()
+
+    val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
+    for (requestAndHandler <- requestAndHandlers) {
+      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
+    }
+
+    EasyMock.verify(txnStateManager)
+
+    assertEquals(0, txnMarkerPurgatory.watched)
+    assertEquals(0, 
channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
+    assertEquals(None, txnMetadata2.pendingState)
+    assertEquals(CompleteCommit, txnMetadata2.state)
+  }
+
+  @Test
+  def shouldAbortAppendToLogOnEndTxnWhenNotCoordinatorError(): Unit = {
+    mockCache()
+
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition1.topic),
+      EasyMock.eq(partition1.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
+
+    val txnTransitionMetadata2 = 
txnMetadata2.prepareComplete(time.milliseconds())
+
+    EasyMock.expect(txnStateManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId2),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(txnTransitionMetadata2),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          txnMetadata2.pendingState = None
+          capturedErrorsCallback.getValue.apply(Errors.NOT_COORDINATOR)
+        }
+      }).once()
+    EasyMock.replay(txnStateManager, metadataCache)
+
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, 
txnResult, txnMetadata2, txnTransitionMetadata2)
+
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
senderThread.generateRequests()
+
+    val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
+    for (requestAndHandler <- requestAndHandlers) {
+      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
+    }
+
+    EasyMock.verify(txnStateManager)
+
+    assertEquals(0, txnMarkerPurgatory.watched)
+    assertEquals(0, 
channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
+    assertEquals(None, txnMetadata2.pendingState)
+    assertEquals(PrepareCommit, txnMetadata2.state)
+  }
+
+  @Test
+  def shouldRetryAppendToLogOnEndTxnWhenCoordinatorNotAvailableError(): Unit = 
{
+    mockCache()
+
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition1.topic),
+      EasyMock.eq(partition1.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
+
+    val txnTransitionMetadata2 = 
txnMetadata2.prepareComplete(time.milliseconds())
+
+    EasyMock.expect(txnStateManager.appendTransactionToLog(
+      EasyMock.eq(transactionalId2),
+      EasyMock.eq(coordinatorEpoch),
+      EasyMock.eq(txnTransitionMetadata2),
+      EasyMock.capture(capturedErrorsCallback),
+      EasyMock.anyObject()))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          
capturedErrorsCallback.getValue.apply(Errors.COORDINATOR_NOT_AVAILABLE)
+        }
+      })
+      .andAnswer(new IAnswer[Unit] {
+      override def answer(): Unit = {
+        txnMetadata2.completeTransitionTo(txnTransitionMetadata2)
+        capturedErrorsCallback.getValue.apply(Errors.NONE)
+      }
+    })
+
+    EasyMock.replay(txnStateManager, metadataCache)
+
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, 
txnResult, txnMetadata2, txnTransitionMetadata2)
+
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
senderThread.generateRequests()
+
+    val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
+    for (requestAndHandler <- requestAndHandlers) {
+      requestAndHandler.handler.onComplete(new ClientResponse(new 
RequestHeader(0, 0, "client", 1), null, null, 0, 0, false, null, response))
+    }
+
+    // call this again so that append log will be retried
+    senderThread.generateRequests()
+
+    EasyMock.verify(txnStateManager)
+
+    assertEquals(0, txnMarkerPurgatory.watched)
+    assertEquals(0, 
channelManager.queueForBroker(broker1.id).get.totalNumMarkers)
+    assertEquals(None, txnMetadata2.pendingState)
+    assertEquals(CompleteCommit, txnMetadata2.state)
+  }
+
+  private def createPidErrorMap(errors: Errors) = {
+    val pidMap = new java.util.HashMap[java.lang.Long, 
java.util.Map[TopicPartition, Errors]]()
+    val errorsMap = new java.util.HashMap[TopicPartition, Errors]()
+    errorsMap.put(partition1, errors)
+    pidMap.put(producerId2, errorsMap)
+    pidMap
+  }
+
+  @Test
   def shouldCreateMetricsOnStarting(): Unit = {
     val metrics = Metrics.defaultRegistry.allMetrics
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6bcb847/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index b5d7903..032a967 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -208,7 +208,7 @@ class TransactionStateManagerTest {
   }
 
   @Test
-  def testAppendTransactionToLog() {
+  def testCompleteTransitionWhenAppendSucceeded(): Unit = {
     transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
 
     // first insert the initial transaction metadata
@@ -226,50 +226,98 @@ class TransactionStateManagerTest {
 
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
+  }
 
-    // append to log again with expected failures
-    txnMetadata1.pendingState = None
-    val failedMetadata = 
txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
+  @Test
+  def testAppendFailToCoordinatorNotAvailableError(): Unit = {
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, 
txnMetadata1)
 
-    // test COORDINATOR_NOT_AVAILABLE cases
     expectedError = Errors.COORDINATOR_NOT_AVAILABLE
+    var failedMetadata = 
txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
+    failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
+    failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
+    failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
+  }
+
+  @Test
+  def testAppendFailToNotCoordinatorError(): Unit = {
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, 
txnMetadata1)
 
-    // test NOT_COORDINATOR cases
     expectedError = Errors.NOT_COORDINATOR
+    var failedMetadata = 
txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
-    // test Unknown cases
+    failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
+    prepareForTxnMessageAppend(Errors.NONE)
+    transactionManager.removeTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch)
+    transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
+
+    prepareForTxnMessageAppend(Errors.NONE)
+    transactionManager.removeTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch)
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch + 1, new Pool[String, TransactionMetadata]())
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, 
txnMetadata1)
+    transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
+
+    prepareForTxnMessageAppend(Errors.NONE)
+    transactionManager.removeTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch)
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
+    transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
+  }
+
+  @Test
+  def testAppendFailToCoordinatorLoadingError(): Unit = {
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, 
txnMetadata1)
+
+    expectedError = Errors.COORDINATOR_LOAD_IN_PROGRESS
+    val failedMetadata = 
txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
+
+    prepareForTxnMessageAppend(Errors.NONE)
+    transactionManager.removeTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch)
+    transactionManager.addLoadingPartition(partitionId, coordinatorEpoch + 1)
+    transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
+  }
+
+  @Test
+  def testAppendFailToUnknownError() {
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, 
txnMetadata1)
+
     expectedError = Errors.UNKNOWN
+    var failedMetadata = 
txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
 
     prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
 
+    failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
     prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
@@ -277,6 +325,20 @@ class TransactionStateManagerTest {
   }
 
   @Test
+  def testPendingStateNotResetOnRetryAppend() {
+    transactionManager.addLoadedTransactionsToCache(partitionId, 
coordinatorEpoch, new Pool[String, TransactionMetadata]())
+    transactionManager.putTransactionStateIfNotExists(transactionalId1, 
txnMetadata1)
+
+    expectedError = Errors.COORDINATOR_NOT_AVAILABLE
+    val failedMetadata = 
txnMetadata1.prepareAddPartitions(Set[TopicPartition](new 
TopicPartition("topic2", 0)), time.milliseconds())
+
+    prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+    transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback, _ => true)
+    assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
+    assertEquals(Some(Ongoing), txnMetadata1.pendingState)
+  }
+
+  @Test
   def testAppendTransactionToLogWhileProducerFenced() = {
     transactionManager.addLoadedTransactionsToCache(partitionId, 0, new 
Pool[String, TransactionMetadata]())
 

Reply via email to