Repository: kafka
Updated Branches:
  refs/heads/trunk f9865d52e -> e110e1c1e


KAFKA-5758; Don't fail fetch request if replica is no longer a follower for a 
partition

We log a warning instead, which is what we also do if the partition
hasn't been created yet.

A few other improvements:
- Return updated high watermark if fetch is returned immediately.
This seems to be more intuitive and is consistent with the case
where the fetch request is served from the purgatory.
- Centralise offline partition handling
- Remove unnecessary `tryCompleteDelayedProduce` that would
already have been done by the called method
- A few other minor clean-ups

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jun Rao <jun...@gmail.com>

Closes #3954 from 
ijuma/kafka-5758-dont-fail-fetch-request-if-replica-is-not-follower


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e110e1c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e110e1c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e110e1c1

Branch: refs/heads/trunk
Commit: e110e1c1ea6a365bcde07a1ee3fe466a6fde1541
Parents: f9865d5
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Mon Oct 2 22:35:55 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Mon Oct 2 22:35:55 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |   2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  71 +++++----
 core/src/main/scala/kafka/cluster/Replica.scala |   2 +-
 .../group/GroupMetadataManager.scala            |   3 +-
 .../scala/kafka/server/ReplicaManager.scala     | 150 +++++++++++--------
 core/src/main/scala/kafka/utils/CoreUtils.scala |   6 +
 core/src/main/scala/kafka/utils/Pool.scala      |   9 +-
 .../group/GroupMetadataManagerTest.scala        |  15 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 119 +++++++++++----
 9 files changed, 232 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 8c873f7..ee987f1 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -639,7 +639,7 @@ object AdminUtils extends Logging with AdminUtilities {
   def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, 
sanitizedEntityName: String): Properties = {
     val entityConfigPath = getEntityConfigPath(rootEntityType, 
sanitizedEntityName)
     // readDataMaybeNull returns Some(null) if the path exists, but there is 
no data
-    val str: String = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull
+    val str = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull
     val props = new Properties()
     if (str != null) {
       Json.parseFull(str).foreach { jsValue =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index f6f825f..1e3ab75 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,9 +22,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
-import kafka.common.NotAssignedReplicaException
 import kafka.controller.KafkaController
-import kafka.log.LogConfig
+import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
@@ -264,37 +263,31 @@ class Partition(val topic: String,
   }
 
   /**
-   * Update the log end offset of a certain replica of this partition
+   * Update the the follower's state in the leader based on the last fetch 
request. See
+   * [[kafka.cluster.Replica#updateLogReadResult]] for details.
+   *
+   * @return true if the leader's log start offset or high watermark have been 
updated
    */
-  def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) 
{
-    getReplica(replicaId) match {
-      case Some(replica) =>
-        // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
-        val oldLeaderLW = if 
(replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader 
else -1L
-        replica.updateLogReadResult(logReadResult)
-        val newLeaderLW = if 
(replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader 
else -1L
-        // check if the LW of the partition has incremented
-        // since the replica's logStartOffset may have incremented
-        val leaderLWIncremented = newLeaderLW > oldLeaderLW
-        // check if we need to expand ISR to include this replica
-        // if it is not in the ISR yet
-        val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
-
-        // some delayed operations may be unblocked after HW or LW changed
-        if (leaderLWIncremented || leaderHWIncremented)
-          tryCompleteDelayedRequests()
-
-        debug("Recorded replica %d log end offset (LEO) position %d."
-          .format(replicaId, 
logReadResult.info.fetchOffsetMetadata.messageOffset))
-      case None =>
-        throw new NotAssignedReplicaException(("Leader %d failed to record 
follower %d's position %d since the replica" +
-          " is not recognized to be one of the assigned replicas %s for 
partition %s.")
-          .format(localBrokerId,
-                  replicaId,
-                  logReadResult.info.fetchOffsetMetadata.messageOffset,
-                  assignedReplicas.map(_.brokerId).mkString(","),
-                  topicPartition))
-    }
+  def updateReplicaLogReadResult(replica: Replica, logReadResult: 
LogReadResult): Boolean = {
+    val replicaId = replica.brokerId
+    // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
+    val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed 
> 0) lowWatermarkIfLeader else -1L
+    replica.updateLogReadResult(logReadResult)
+    val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed 
> 0) lowWatermarkIfLeader else -1L
+    // check if the LW of the partition has incremented
+    // since the replica's logStartOffset may have incremented
+    val leaderLWIncremented = newLeaderLW > oldLeaderLW
+    // check if we need to expand ISR to include this replica
+    // if it is not in the ISR yet
+    val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
+
+    val result = leaderLWIncremented || leaderHWIncremented
+    // some delayed operations may be unblocked after HW or LW changed
+    if (result)
+      tryCompleteDelayedRequests()
+
+    debug(s"Recorded replica $replicaId log end offset (LEO) position 
${logReadResult.info.fetchOffsetMetadata.messageOffset}.")
+    result
   }
 
   /**
@@ -305,7 +298,9 @@ class Partition(val topic: String,
    * even if its log end offset is >= HW. However, to be consistent with how 
the follower determines
    * whether a replica is in-sync, we only check HW.
    *
-   * This function can be triggered when a replica's LEO has incremented
+   * This function can be triggered when a replica's LEO has incremented.
+   *
+   * @return true if the high watermark has been updated
    */
   def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
@@ -314,7 +309,7 @@ class Partition(val topic: String,
         case Some(leaderReplica) =>
           val replica = getReplica(replicaId).get
           val leaderHW = leaderReplica.highWatermark
-          if(!inSyncReplicas.contains(replica) &&
+          if (!inSyncReplicas.contains(replica) &&
              assignedReplicas.map(_.brokerId).contains(replicaId) &&
              replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
             val newInSyncReplicas = inSyncReplicas + replica
@@ -421,8 +416,10 @@ class Partition(val topic: String,
   def lowWatermarkIfLeader: Long = {
     if (!isLeaderReplicaLocal)
       throw new NotLeaderForPartitionException("Leader not local for partition 
%s on broker %d".format(topicPartition, localBrokerId))
-    assignedReplicas.filter(replica =>
-      
replicaManager.metadataCache.isBrokerAlive(replica.brokerId)).map(_.logStartOffset).reduceOption(_
 min _).getOrElse(0L)
+    val logStartOffsets = assignedReplicas.collect {
+      case replica if 
replicaManager.metadataCache.isBrokerAlive(replica.brokerId) => 
replica.logStartOffset
+    }
+    CoreUtils.min(logStartOffsets, 0L)
   }
 
   /**
@@ -485,7 +482,7 @@ class Partition(val topic: String,
     laggingReplicas
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, 
requiredAcks: Int = 0) = {
+  def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, 
requiredAcks: Int = 0): LogAppendInfo = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal match {
         case Some(leaderReplica) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 05aee7b..6f32a59 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -72,7 +72,7 @@ class Replica(val brokerId: Int,
    * fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
    * high frequency.
    */
-  def updateLogReadResult(logReadResult : LogReadResult) {
+  def updateLogReadResult(logReadResult: LogReadResult) {
     if (logReadResult.info.fetchOffsetMetadata.messageOffset >= 
logReadResult.leaderLogEndOffset)
       _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, 
logReadResult.fetchTimeMs)
     else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= 
lastFetchLeaderLogEndOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index aba0249..c818b57 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -697,8 +697,7 @@ class GroupMetadataManager(brokerId: Int,
           val timestampType = TimestampType.CREATE_TIME
           val timestamp = time.milliseconds()
 
-          val partitionOpt = 
replicaManager.getPartition(appendPartition).filter(_ ne 
ReplicaManager.OfflinePartition)
-          partitionOpt.foreach { partition =>
+          replicaManager.nonOfflinePartition(appendPartition).foreach { 
partition =>
             val tombstones = ListBuffer.empty[SimpleRecord]
             removedOffsets.foreach { case (topicPartition, offsetAndMetadata) 
=>
               trace(s"Removing expired/deleted offset and metadata for 
$groupId, $topicPartition: $offsetAndMetadata")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3acb88b..b5a93b0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,7 +30,7 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, LogDirNotFoundException, InvalidTimestampException, 
InvalidTopicException, KafkaStorageException, NotEnoughReplicasException, 
NotLeaderForPartitionException, OffsetOutOfRangeException, 
PolicyViolationException, _}
+import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, InvalidTimestampException, InvalidTopicException, 
KafkaStorageException, LogDirNotFoundException, NotEnoughReplicasException, 
NotLeaderForPartitionException, OffsetOutOfRangeException, 
PolicyViolationException, _}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -89,6 +89,14 @@ case class LogReadResult(info: FetchDataInfo,
     case Some(e) => Errors.forException(e)
   }
 
+  def updateLeaderReplicaInfo(leaderReplica: Replica): LogReadResult =
+    copy(highWatermark = leaderReplica.highWatermark.messageOffset,
+      leaderLogStartOffset = leaderReplica.logStartOffset,
+      leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset)
+
+  def withEmptyFetchInfo: LogReadResult =
+    copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MemoryRecords.EMPTY))
+
   override def toString =
     s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: 
[$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
     s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: 
[$fetchTimeMs], readSize: [$readSize], error: [$error]"
@@ -204,7 +212,7 @@ class ReplicaManager(val config: KafkaConfig,
   val leaderCount = newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def value = getLeaderPartitions.size
+      def value = leaderPartitionsIterator.size
     }
   )
   val partitionCount = newGauge(
@@ -216,7 +224,7 @@ class ReplicaManager(val config: KafkaConfig,
   val offlineReplicaCount = newGauge(
     "OfflineReplicaCount",
     new Gauge[Int] {
-      def value = allPartitions.values.count(_ eq 
ReplicaManager.OfflinePartition)
+      def value = offlinePartitionsIterator.size
     }
   )
   val underReplicatedPartitions = newGauge(
@@ -229,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig,
   val underMinIsrPartitionCount = newGauge(
     "UnderMinIsrPartitionCount",
     new Gauge[Int] {
-      def value = getLeaderPartitions.count(_.isUnderMinIsr)
+      def value = leaderPartitionsIterator.count(_.isUnderMinIsr)
     }
   )
 
@@ -237,8 +245,7 @@ class ReplicaManager(val config: KafkaConfig,
   val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
   val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", 
"failedUpdates", TimeUnit.SECONDS)
 
-  def underReplicatedPartitionCount: Int =
-    getLeaderPartitions.count(_.isUnderReplicated)
+  def underReplicatedPartitionCount: Int = 
leaderPartitionsIterator.count(_.isUnderReplicated)
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
@@ -326,7 +333,7 @@ class ReplicaManager(val config: KafkaConfig,
             throw new KafkaStorageException(s"Partition $topicPartition is on 
an offline disk")
           val removedPartition = allPartitions.remove(topicPartition)
           if (removedPartition != null) {
-            val topicHasPartitions = allPartitions.values.exists(partition => 
topicPartition.topic == partition.topic)
+            val topicHasPartitions = allPartitions.values.exists(_.topic == 
topicPartition.topic)
             if (!topicHasPartitions)
               brokerTopicStats.removeMetrics(topicPartition.topic)
             // this will delete the local log. This call may throw exception 
if the log is on offline directory
@@ -382,6 +389,15 @@ class ReplicaManager(val config: KafkaConfig,
   def getPartition(topicPartition: TopicPartition): Option[Partition] =
     Option(allPartitions.get(topicPartition))
 
+  def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
+    getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
+
+  private def nonOfflinePartitionsIterator: Iterator[Partition] =
+    allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
+
+  private def offlinePartitionsIterator: Iterator[Partition] =
+    allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
+
   def getReplicaOrException(topicPartition: TopicPartition): Replica = {
     getPartition(topicPartition) match {
       case Some(partition) =>
@@ -412,7 +428,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getReplica(topicPartition: TopicPartition, replicaId: Int): 
Option[Replica] =
-    getPartition(topicPartition).filter(_ ne 
ReplicaManager.OfflinePartition).flatMap(_.getReplica(replicaId))
+    nonOfflinePartition(topicPartition).flatMap(_.getReplica(replicaId))
 
   def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, 
localBrokerId)
 
@@ -765,25 +781,25 @@ class ReplicaManager(val config: KafkaConfig,
                     quota: ReplicaQuota = UnboundedQuota,
                     responseCallback: Seq[(TopicPartition, 
FetchPartitionData)] => Unit,
                     isolationLevel: IsolationLevel) {
-    val isFromFollower = replicaId >= 0
-    val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
-    val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
-
-    // read from local logs
-    val logReadResults = readFromLocalLog(
-      replicaId = replicaId,
-      fetchOnlyFromLeader = fetchOnlyFromLeader,
-      readOnlyCommitted = fetchOnlyCommitted,
-      fetchMaxBytes = fetchMaxBytes,
-      hardMaxBytesLimit = hardMaxBytesLimit,
-      readPartitionInfo = fetchInfos,
-      quota = quota,
-      isolationLevel = isolationLevel)
-
-    // if the fetch comes from the follower,
-    // update its corresponding log end offset
-    if(Request.isValidBrokerId(replicaId))
-      updateFollowerLogReadResults(replicaId, logReadResults)
+    val isFromFollower = Request.isValidBrokerId(replicaId)
+    val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId
+    val fetchOnlyCommitted = !isFromFollower
+
+    def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
+      val result = readFromLocalLog(
+        replicaId = replicaId,
+        fetchOnlyFromLeader = fetchOnlyFromLeader,
+        readOnlyCommitted = fetchOnlyCommitted,
+        fetchMaxBytes = fetchMaxBytes,
+        hardMaxBytesLimit = hardMaxBytesLimit,
+        readPartitionInfo = fetchInfos,
+        quota = quota,
+        isolationLevel = isolationLevel)
+      if (isFromFollower) updateFollowerLogReadResults(replicaId, result)
+      else result
+    }
+
+    val logReadResults = readFromLog()
 
     // check if this fetch request can be satisfied right away
     val logReadResultValues = logReadResults.map { case (_, v) => v }
@@ -943,11 +959,11 @@ class ReplicaManager(val config: KafkaConfig,
     var minOneMessage = !hardMaxBytesLimit
     readPartitionInfo.foreach { case (tp, fetchInfo) =>
       val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
-      val messageSetSize = readResult.info.records.sizeInBytes
+      val recordBatchSize = readResult.info.records.sizeInBytes
       // Once we read from a non-empty partition, we stop ignoring request and 
partition level size limits
-      if (messageSetSize > 0)
+      if (recordBatchSize > 0)
         minOneMessage = false
-      limitBytes = math.max(0, limitBytes - messageSetSize)
+      limitBytes = math.max(0, limitBytes - recordBatchSize)
       result += (tp -> readResult)
     }
     result
@@ -958,9 +974,9 @@ class ReplicaManager(val config: KafkaConfig,
    *  the quota is exceeded and the replica is not in sync.
    */
   def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: 
TopicPartition, replicaId: Int): Boolean = {
-    val isReplicaInSync = getPartition(topicPartition).filter(_ ne 
ReplicaManager.OfflinePartition).flatMap { partition =>
-      partition.getReplica(replicaId).map(partition.inSyncReplicas.contains)
-    }.getOrElse(false)
+    val isReplicaInSync = nonOfflinePartition(topicPartition).exists { 
partition =>
+      partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains)
+    }
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && 
!isReplicaInSync
   }
 
@@ -1278,44 +1294,50 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be 
removed from the ISR")
-    allPartitions.values.filter(_ ne 
ReplicaManager.OfflinePartition).foreach(partition => 
partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+    
nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
   }
 
-  private def updateFollowerLogReadResults(replicaId: Int, readResults: 
Seq[(TopicPartition, LogReadResult)]) {
-    debug("Recording follower broker %d log read results: %s 
".format(replicaId, readResults))
-    readResults.foreach { case (topicPartition, readResult) =>
-      getPartition(topicPartition) match {
+  /**
+   * Update the follower's fetch state in the leader based on the last fetch 
request and update `readResult`,
+   * if necessary.
+   */
+  private def updateFollowerLogReadResults(replicaId: Int,
+                                           readResults: Seq[(TopicPartition, 
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
+    debug(s"Recording follower broker $replicaId log end offsets: 
$readResults")
+    readResults.map { case (topicPartition, readResult) =>
+      var updatedReadResult = readResult
+      nonOfflinePartition(topicPartition) match {
         case Some(partition) =>
-          if (partition ne ReplicaManager.OfflinePartition)
-            partition.updateReplicaLogReadResult(replicaId, readResult)
-
-          // for producer requests with ack > 1, we need to check
-          // if they can be unblocked after some follower's log end offsets 
have moved
-          tryCompleteDelayedProduce(new 
TopicPartitionOperationKey(topicPartition))
+          partition.getReplica(replicaId) match {
+            case Some(replica) =>
+              if (partition.updateReplicaLogReadResult(replica, readResult))
+                partition.leaderReplicaIfLocal.foreach { leaderReplica =>
+                  updatedReadResult = 
readResult.updateLeaderReplicaInfo(leaderReplica)
+                }
+            case None =>
+              warn(s"Leader $localBrokerId failed to record follower 
$replicaId's position " +
+                s"${readResult.info.fetchOffsetMetadata.messageOffset} since 
the replica is not recognized to be " +
+                s"one of the assigned replicas 
${partition.assignedReplicas.map(_.brokerId).mkString(",")} " +
+                s"for partition $topicPartition. Empty records will be 
returned for this partition.")
+              updatedReadResult = readResult.withEmptyFetchInfo
+          }
         case None =>
-          warn("While recording the replica LEO, the partition %s hasn't been 
created.".format(topicPartition))
+          warn(s"While recording the replica LEO, the partition 
$topicPartition hasn't been created.")
       }
+      topicPartition -> updatedReadResult
     }
   }
 
-  private def getLeaderPartitions: List[Partition] =
-    allPartitions.values.filter(partition => (partition ne 
ReplicaManager.OfflinePartition) && 
partition.leaderReplicaIfLocal.isDefined).toList
+  private def leaderPartitionsIterator: Iterator[Partition] =
+    nonOfflinePartitionsIterator.filter(_.leaderReplicaIfLocal.isDefined)
 
-  def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = {
-    getPartition(topicPartition) match {
-      case Some(partition) =>
-        if (partition eq ReplicaManager.OfflinePartition)
-          None
-        else
-          partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)
-      case None => None
-    }
-  }
+  def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
+    
nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
 
   // Flushes the highwatermark value for all partitions to the highwatermark 
file
   def checkpointHighWatermarks() {
-    val replicas = allPartitions.values.filter(_ ne 
ReplicaManager.OfflinePartition).flatMap(_.getReplica(localBrokerId))
-    val replicasByDir = 
replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+    val replicas = 
nonOfflinePartitionsIterator.flatMap(_.getReplica(localBrokerId)).filter(_.log.isDefined).toBuffer
+    val replicasByDir = replicas.groupBy(_.log.get.dir.getParent)
     for ((dir, reps) <- replicasByDir) {
       val hwms = reps.map(r => r.topicPartition -> 
r.highWatermark.messageOffset).toMap
       try {
@@ -1334,13 +1356,9 @@ class ReplicaManager(val config: KafkaConfig,
 
     info(s"Stopping serving replicas in dir $dir")
     replicaStateChangeLock synchronized {
-      val newOfflinePartitions = allPartitions.values.filter { partition =>
-        if (partition eq ReplicaManager.OfflinePartition)
-          false
-        else partition.getReplica(config.brokerId) match {
-          case Some(replica) =>
-            replica.log.isDefined && replica.log.get.dir.getParent == dir
-          case None => false
+      val newOfflinePartitions = nonOfflinePartitionsIterator.filter { 
partition =>
+        partition.getReplica(config.brokerId).exists { replica =>
+          replica.log.isDefined && replica.log.get.dir.getParent == dir
         }
       }.map(_.topicPartition)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index ca753d5..825ee89 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -47,6 +47,12 @@ import org.apache.kafka.common.utils.{Base64, KafkaThread, 
Utils}
 object CoreUtils extends Logging {
 
   /**
+   * Return the smallest element in `traversable` if it is not empty. 
Otherwise return `ifEmpty`.
+   */
+  def min[A, B >: A](traversable: TraversableOnce[A], ifEmpty: A)(implicit 
cmp: Ordering[B]): A =
+    if (traversable.isEmpty) ifEmpty else traversable.min(cmp)
+
+  /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function
    * @return A Runnable that just executes the function

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala 
b/core/src/main/scala/kafka/utils/Pool.scala
index df74f29..4ddf557 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -27,11 +27,6 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends 
Iterable[(K, V)] {
 
   private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V]
   private val createLock = new Object
-
-  def this(m: collection.Map[K, V]) {
-    this()
-    m.foreach(kv => pool.put(kv._1, kv._2))
-  }
   
   def put(k: K, v: V): V = pool.put(k, v)
   
@@ -85,9 +80,9 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends 
Iterable[(K, V)] {
 
   def remove(key: K, value: V): Boolean = pool.remove(key, value)
 
-  def keys: mutable.Set[K] = pool.keySet().asScala
+  def keys: mutable.Set[K] = pool.keySet.asScala
 
-  def values: Iterable[V] = pool.values().asScala
+  def values: Iterable[V] = pool.values.asScala
 
   def clear() { pool.clear() }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 0def9ce..4a509ed 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1026,7 +1026,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    mockGetPartition()
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -1079,7 +1079,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    mockGetPartition()
     
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
       isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -1127,7 +1127,7 @@ class GroupMetadataManagerTest {
     val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
 
     
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    mockGetPartition()
     
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture),
       isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
       .andReturn(LogAppendInfo.UnknownLogAppendInfo)
@@ -1181,7 +1181,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    mockGetPartition()
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -1259,7 +1259,7 @@ class GroupMetadataManagerTest {
       topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
-    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    mockGetPartition()
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
 
@@ -1389,4 +1389,9 @@ class GroupMetadataManagerTest {
     }.toSeq
   }
 
+  private def mockGetPartition(): Unit = {
+    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 27d4312..b8c78ff 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -46,12 +47,14 @@ class ReplicaManagerTest {
   val topic = "test-topic"
   val time = new MockTime
   val metrics = new Metrics
-  var zkClient : ZkClient = _
-  var zkUtils : ZkUtils = _
+  var zkClient: ZkClient = _
+  var zkUtils: ZkUtils = _
 
   @Before
   def setUp() {
     zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.expect(zkClient.readData(EasyMock.anyString(), 
EasyMock.anyObject[Stat])).andReturn(null).anyTimes()
+    EasyMock.replay(zkClient)
     zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
   }
 
@@ -319,7 +322,7 @@ class ReplicaManagerTest {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)
 
     try {
-      val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
+      val brokerList = Seq[Integer](0, 1).asJava
       val partition = replicaManager.getOrCreatePartition(new 
TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
 
@@ -378,23 +381,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchBeyondHighWatermarkReturnEmptyResponse() {
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
-    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
-    props.put("broker.id", Int.box(0))
-    val config = KafkaConfig.fromProps(props)
-    val logProps = new Properties()
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray, LogConfig(logProps))
-    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, 
"host1", 1), createBroker(1, "host2", 2))
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
-    
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
-    
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
-    
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
-    
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes()
-    EasyMock.replay(metadataCache)
-    val rm = new ReplicaManager(config, metrics, time, zkUtils, new 
MockScheduler(time), mockLogMgr,
-      new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, 
time).follower, new BrokerTopicStats,
-      metadataCache, new LogDirFailureChannel(config.logDirs.size), 
Option(this.getClass.getName))
-
+    val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, 
aliveBrokerIds = Seq(0, 1, 2))
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
@@ -432,6 +419,86 @@ class ReplicaManagerTest {
     }
   }
 
+  /**
+   * If a follower sends a fetch request for 2 partitions and it's no longer 
the follower for one of them, the other
+   * partition should not be affected.
+   */
+  @Test
+  def testFetchMessagesWhenNotFollowerForOnePartition() {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer, aliveBrokerIds = Seq(0, 1, 2))
+
+    try {
+      // Create 2 partitions, assign replica 0 as the leader for both a 
different follower (1 and 2) for each
+      val tp0 = new TopicPartition(topic, 0)
+      val tp1 = new TopicPartition(topic, 1)
+      replicaManager.getOrCreatePartition(tp0).getOrCreateReplica(0)
+      replicaManager.getOrCreatePartition(tp1).getOrCreateReplica(0)
+      val partition0Replicas = Seq[Integer](0, 1).asJava
+      val partition1Replicas = Seq[Integer](0, 2).asJava
+      val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
+        collection.immutable.Map(
+          tp0 -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, 
partition0Replicas, 0, partition0Replicas, true),
+          tp1 -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, 
partition1Replicas, 0, partition1Replicas, true)
+        ).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      // Append a couple of messages.
+      for (i <- 1 to 2) {
+        appendRecords(replicaManager, tp0, 
TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
+        appendRecords(replicaManager, tp1, 
TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response =>
+          assertEquals(Errors.NONE, response.error)
+        }
+      }
+
+      def fetchCallback(responseStatus: Seq[(TopicPartition, 
FetchPartitionData)]) = {
+        val responseStatusMap = responseStatus.toMap
+        assertEquals(2, responseStatus.size)
+        assertEquals(Set(tp0, tp1), responseStatusMap.keySet)
+
+        val tp0Status = responseStatusMap.get(tp0)
+        assertTrue(tp0Status.isDefined)
+        assertEquals(1, tp0Status.get.highWatermark)
+        assertEquals(None, tp0Status.get.lastStableOffset)
+        assertEquals(Errors.NONE, tp0Status.get.error)
+        assertTrue(tp0Status.get.records.batches.iterator.hasNext)
+
+        val tp1Status = responseStatusMap.get(tp1)
+        assertTrue(tp1Status.isDefined)
+        assertEquals(0, tp1Status.get.highWatermark)
+        assertEquals(None, tp0Status.get.lastStableOffset)
+        assertEquals(Errors.NONE, tp1Status.get.error)
+        assertFalse(tp1Status.get.records.batches.iterator.hasNext)
+      }
+
+      replicaManager.fetchMessages(
+        timeout = 1000,
+        replicaId = 1,
+        fetchMinBytes = 0,
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        fetchInfos = Seq(
+          tp0 -> new PartitionData(1, 0, 100000),
+          tp1 -> new PartitionData(1, 0, 100000)),
+        responseCallback = fetchCallback,
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED
+      )
+      val tp0Replica = replicaManager.getReplica(tp0)
+      assertTrue(tp0Replica.isDefined)
+      assertEquals("hw should be incremented", 1, 
tp0Replica.get.highWatermark.messageOffset)
+
+      replicaManager.getReplica(tp1)
+      val tp1Replica = replicaManager.getReplica(tp1)
+      assertTrue(tp1Replica.isDefined)
+      assertEquals("hw should not be incremented", 0, 
tp1Replica.get.highWatermark.messageOffset)
+
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   private class CallbackResult[T] {
     private var value: Option[T] = None
     private var fun: Option[T => Unit] = None
@@ -522,18 +589,18 @@ class ReplicaManagerTest {
     result
   }
 
-  private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer): 
ReplicaManager = {
-    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+  private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, 
aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
-    props.put("broker.id", Int.box(0))
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new 
File(_)).toArray, LogConfig(logProps))
-    val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, 
"host1", 1))
+    val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, 
s"host$brokerId", brokerId))
     val metadataCache = EasyMock.createMock(classOf[MetadataCache])
     
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
-    
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes()
-    
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
+    aliveBrokerIds.foreach { brokerId =>
+      
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes()
+    }
     EasyMock.replay(metadataCache)
 
     val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](

Reply via email to