Repository: kafka
Updated Branches:
  refs/heads/trunk 3fd9be49a -> fed3f1f88


MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset`

`numAcks` is only used in the `trace` logging statement so it should be a `def` 
instead of a `val`. Also took the chance to improve the code and documentation 
a little.

Author: Ismael Juma <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Ewen Cheslack-Postava 
<[email protected]>

Closes #1449 from ijuma/minor-avoid-trace-logging-computation-in-partition


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fed3f1f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fed3f1f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fed3f1f8

Branch: refs/heads/trunk
Commit: fed3f1f8890b219e4247fd9de1305ad18679ff99
Parents: 3fd9be4
Author: Ismael Juma <[email protected]>
Authored: Tue May 31 09:03:18 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Tue May 31 09:03:18 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 42 ++++++++++----------
 .../scala/kafka/server/DelayedProduce.scala     | 22 ++++------
 2 files changed, 30 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fed3f1f8/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e79bdc..ea22e87 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -296,46 +296,48 @@ class Partition(val topic: String,
   }
 
   /*
-   * Note that this method will only be called if requiredAcks = -1
-   * and we are waiting for all replicas in ISR to be fully caught up to
-   * the (local) leader's offset corresponding to this produce request
-   * before we acknowledge the produce request.
+   * Returns a tuple where the first element is a boolean indicating whether 
enough replicas reached `requiredOffset`
+   * and the second element is an error (which would be `Errors.NONE` for no 
error).
+   *
+   * Note that this method will only be called if requiredAcks = -1 and we are 
waiting for all replicas in ISR to be
+   * fully caught up to the (local) leader's offset corresponding to this 
produce request before we acknowledge the
+   * produce request.
    */
-  def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = 
{
+  def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) 
= {
     leaderReplicaIfLocal() match {
       case Some(leaderReplica) =>
         // keep the current immutable replica list reference
         val curInSyncReplicas = inSyncReplicas
-        val numAcks = curInSyncReplicas.count(r => {
+
+        def numAcks = curInSyncReplicas.count { r =>
           if (!r.isLocal)
             if (r.logEndOffset.messageOffset >= requiredOffset) {
-              trace("Replica %d of %s-%d received offset 
%d".format(r.brokerId, topic, partitionId, requiredOffset))
+              trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} 
received offset $requiredOffset")
               true
             }
             else
               false
           else
             true /* also count the local (leader) replica */
-        })
+        }
 
-        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
topic, partitionId))
+        trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks 
= -1")
 
         val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
-        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
+        if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
           /*
-          * The topic may be configured not to accept messages if there are 
not enough replicas in ISR
-          * in this scenario the request was already appended locally and then 
added to the purgatory before the ISR was shrunk
-          */
-          if (minIsr <= curInSyncReplicas.size) {
-            (true, Errors.NONE.code)
-          } else {
-            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
-          }
+           * The topic may be configured not to accept messages if there are 
not enough replicas in ISR
+           * in this scenario the request was already appended locally and 
then added to the purgatory before the ISR was shrunk
+           */
+          if (minIsr <= curInSyncReplicas.size)
+            (true, Errors.NONE)
+          else
+            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
         } else
-          (false, Errors.NONE.code)
+          (false, Errors.NONE)
       case None =>
-        (false, Errors.NOT_LEADER_FOR_PARTITION.code)
+        (false, Errors.NOT_LEADER_FOR_PARTITION)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fed3f1f8/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index be1be4f..5a59d3b 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -82,32 +82,26 @@ class DelayedProduce(delayMs: Long,
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
     produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
-      trace("Checking produce satisfaction for %s, current status %s"
-        .format(topicAndPartition, status))
+      trace(s"Checking produce satisfaction for ${topicAndPartition}, current 
status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
-        val partitionOpt = 
replicaManager.getPartition(topicAndPartition.topic, 
topicAndPartition.partition)
-        val (hasEnough, errorCode) = partitionOpt match {
+        val (hasEnough, error) = 
replicaManager.getPartition(topicAndPartition.topic, 
topicAndPartition.partition) match {
           case Some(partition) =>
             partition.checkEnoughReplicasReachOffset(status.requiredOffset)
           case None =>
             // Case A
-            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         }
-        if (errorCode != Errors.NONE.code) {
-          // Case B.1
+        // Case B.1 || B.2
+        if (error != Errors.NONE || hasEnough) {
           status.acksPending = false
-          status.responseStatus.errorCode = errorCode
-        } else if (hasEnough) {
-          // Case B.2
-          status.acksPending = false
-          status.responseStatus.errorCode = Errors.NONE.code
+          status.responseStatus.errorCode = error.code
         }
       }
     }
 
-    // check if each partition has satisfied at lease one of case A and case B
-    if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
+    // check if every partition has satisfied at least one of case A or B
+    if (!produceMetadata.produceStatus.values.exists(_.acksPending))
       forceComplete()
     else
       false

Reply via email to