Repository: kafka Updated Branches: refs/heads/trunk f9865d52e -> e110e1c1e
KAFKA-5758; Don't fail fetch request if replica is no longer a follower for a partition We log a warning instead, which is what we also do if the partition hasn't been created yet. A few other improvements: - Return updated high watermark if fetch is returned immediately. This seems to be more intuitive and is consistent with the case where the fetch request is served from the purgatory. - Centralise offline partition handling - Remove unnecessary `tryCompleteDelayedProduce` that would already have been done by the called method - A few other minor clean-ups Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jun Rao <jun...@gmail.com> Closes #3954 from ijuma/kafka-5758-dont-fail-fetch-request-if-replica-is-not-follower Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e110e1c1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e110e1c1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e110e1c1 Branch: refs/heads/trunk Commit: e110e1c1ea6a365bcde07a1ee3fe466a6fde1541 Parents: f9865d5 Author: Ismael Juma <ism...@juma.me.uk> Authored: Mon Oct 2 22:35:55 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Mon Oct 2 22:35:55 2017 +0100 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AdminUtils.scala | 2 +- .../main/scala/kafka/cluster/Partition.scala | 71 +++++---- core/src/main/scala/kafka/cluster/Replica.scala | 2 +- .../group/GroupMetadataManager.scala | 3 +- .../scala/kafka/server/ReplicaManager.scala | 150 +++++++++++-------- core/src/main/scala/kafka/utils/CoreUtils.scala | 6 + core/src/main/scala/kafka/utils/Pool.scala | 9 +- .../group/GroupMetadataManagerTest.scala | 15 +- .../unit/kafka/server/ReplicaManagerTest.scala | 119 +++++++++++---- 9 files changed, 232 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 8c873f7..ee987f1 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -639,7 +639,7 @@ object AdminUtils extends Logging with AdminUtilities { def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = { val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName) // readDataMaybeNull returns Some(null) if the path exists, but there is no data - val str: String = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull + val str = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull val props = new Properties() if (str != null) { Json.parseFull(str).foreach { jsValue => http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f6f825f..1e3ab75 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,9 +22,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr -import kafka.common.NotAssignedReplicaException import kafka.controller.KafkaController -import kafka.log.LogConfig +import kafka.log.{LogAppendInfo, LogConfig} import kafka.metrics.KafkaMetricsGroup import kafka.server._ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} @@ -264,37 +263,31 @@ class Partition(val topic: String, } /** - * Update the log end offset of a certain replica of this partition + * Update the the follower's state in the leader based on the last fetch request. See + * [[kafka.cluster.Replica#updateLogReadResult]] for details. + * + * @return true if the leader's log start offset or high watermark have been updated */ - def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) { - getReplica(replicaId) match { - case Some(replica) => - // No need to calculate low watermark if there is no delayed DeleteRecordsRequest - val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L - replica.updateLogReadResult(logReadResult) - val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L - // check if the LW of the partition has incremented - // since the replica's logStartOffset may have incremented - val leaderLWIncremented = newLeaderLW > oldLeaderLW - // check if we need to expand ISR to include this replica - // if it is not in the ISR yet - val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult) - - // some delayed operations may be unblocked after HW or LW changed - if (leaderLWIncremented || leaderHWIncremented) - tryCompleteDelayedRequests() - - debug("Recorded replica %d log end offset (LEO) position %d." - .format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset)) - case None => - throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + - " is not recognized to be one of the assigned replicas %s for partition %s.") - .format(localBrokerId, - replicaId, - logReadResult.info.fetchOffsetMetadata.messageOffset, - assignedReplicas.map(_.brokerId).mkString(","), - topicPartition)) - } + def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = { + val replicaId = replica.brokerId + // No need to calculate low watermark if there is no delayed DeleteRecordsRequest + val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L + replica.updateLogReadResult(logReadResult) + val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L + // check if the LW of the partition has incremented + // since the replica's logStartOffset may have incremented + val leaderLWIncremented = newLeaderLW > oldLeaderLW + // check if we need to expand ISR to include this replica + // if it is not in the ISR yet + val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult) + + val result = leaderLWIncremented || leaderHWIncremented + // some delayed operations may be unblocked after HW or LW changed + if (result) + tryCompleteDelayedRequests() + + debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.") + result } /** @@ -305,7 +298,9 @@ class Partition(val topic: String, * even if its log end offset is >= HW. However, to be consistent with how the follower determines * whether a replica is in-sync, we only check HW. * - * This function can be triggered when a replica's LEO has incremented + * This function can be triggered when a replica's LEO has incremented. + * + * @return true if the high watermark has been updated */ def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { @@ -314,7 +309,7 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - if(!inSyncReplicas.contains(replica) && + if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset.offsetDiff(leaderHW) >= 0) { val newInSyncReplicas = inSyncReplicas + replica @@ -421,8 +416,10 @@ class Partition(val topic: String, def lowWatermarkIfLeader: Long = { if (!isLeaderReplicaLocal) throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId)) - assignedReplicas.filter(replica => - replicaManager.metadataCache.isBrokerAlive(replica.brokerId)).map(_.logStartOffset).reduceOption(_ min _).getOrElse(0L) + val logStartOffsets = assignedReplicas.collect { + case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) => replica.logStartOffset + } + CoreUtils.min(logStartOffsets, 0L) } /** @@ -485,7 +482,7 @@ class Partition(val topic: String, laggingReplicas } - def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = { + def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal match { case Some(leaderReplica) => http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/cluster/Replica.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 05aee7b..6f32a59 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -72,7 +72,7 @@ class Replica(val brokerId: Int, * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateLogReadResult(logReadResult : LogReadResult) { + def updateLogReadResult(logReadResult: LogReadResult) { if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index aba0249..c818b57 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -697,8 +697,7 @@ class GroupMetadataManager(brokerId: Int, val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() - val partitionOpt = replicaManager.getPartition(appendPartition).filter(_ ne ReplicaManager.OfflinePartition) - partitionOpt.foreach { partition => + replicaManager.nonOfflinePartition(appendPartition).foreach { partition => val tombstones = ListBuffer.empty[SimpleRecord] removedOffsets.foreach { case (topicPartition, offsetAndMetadata) => trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3acb88b..b5a93b0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -30,7 +30,7 @@ import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, LogDirNotFoundException, InvalidTimestampException, InvalidTopicException, KafkaStorageException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _} +import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, KafkaStorageException, LogDirNotFoundException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors @@ -89,6 +89,14 @@ case class LogReadResult(info: FetchDataInfo, case Some(e) => Errors.forException(e) } + def updateLeaderReplicaInfo(leaderReplica: Replica): LogReadResult = + copy(highWatermark = leaderReplica.highWatermark.messageOffset, + leaderLogStartOffset = leaderReplica.logStartOffset, + leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset) + + def withEmptyFetchInfo: LogReadResult = + copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) + override def toString = s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]" @@ -204,7 +212,7 @@ class ReplicaManager(val config: KafkaConfig, val leaderCount = newGauge( "LeaderCount", new Gauge[Int] { - def value = getLeaderPartitions.size + def value = leaderPartitionsIterator.size } ) val partitionCount = newGauge( @@ -216,7 +224,7 @@ class ReplicaManager(val config: KafkaConfig, val offlineReplicaCount = newGauge( "OfflineReplicaCount", new Gauge[Int] { - def value = allPartitions.values.count(_ eq ReplicaManager.OfflinePartition) + def value = offlinePartitionsIterator.size } ) val underReplicatedPartitions = newGauge( @@ -229,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig, val underMinIsrPartitionCount = newGauge( "UnderMinIsrPartitionCount", new Gauge[Int] { - def value = getLeaderPartitions.count(_.isUnderMinIsr) + def value = leaderPartitionsIterator.count(_.isUnderMinIsr) } ) @@ -237,8 +245,7 @@ class ReplicaManager(val config: KafkaConfig, val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) - def underReplicatedPartitionCount: Int = - getLeaderPartitions.count(_.isUnderReplicated) + def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated) def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) @@ -326,7 +333,7 @@ class ReplicaManager(val config: KafkaConfig, throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") val removedPartition = allPartitions.remove(topicPartition) if (removedPartition != null) { - val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic) + val topicHasPartitions = allPartitions.values.exists(_.topic == topicPartition.topic) if (!topicHasPartitions) brokerTopicStats.removeMetrics(topicPartition.topic) // this will delete the local log. This call may throw exception if the log is on offline directory @@ -382,6 +389,15 @@ class ReplicaManager(val config: KafkaConfig, def getPartition(topicPartition: TopicPartition): Option[Partition] = Option(allPartitions.get(topicPartition)) + def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = + getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) + + private def nonOfflinePartitionsIterator: Iterator[Partition] = + allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) + + private def offlinePartitionsIterator: Iterator[Partition] = + allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) + def getReplicaOrException(topicPartition: TopicPartition): Replica = { getPartition(topicPartition) match { case Some(partition) => @@ -412,7 +428,7 @@ class ReplicaManager(val config: KafkaConfig, } def getReplica(topicPartition: TopicPartition, replicaId: Int): Option[Replica] = - getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(replicaId)) + nonOfflinePartition(topicPartition).flatMap(_.getReplica(replicaId)) def getReplica(tp: TopicPartition): Option[Replica] = getReplica(tp, localBrokerId) @@ -765,25 +781,25 @@ class ReplicaManager(val config: KafkaConfig, quota: ReplicaQuota = UnboundedQuota, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, isolationLevel: IsolationLevel) { - val isFromFollower = replicaId >= 0 - val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId - val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) - - // read from local logs - val logReadResults = readFromLocalLog( - replicaId = replicaId, - fetchOnlyFromLeader = fetchOnlyFromLeader, - readOnlyCommitted = fetchOnlyCommitted, - fetchMaxBytes = fetchMaxBytes, - hardMaxBytesLimit = hardMaxBytesLimit, - readPartitionInfo = fetchInfos, - quota = quota, - isolationLevel = isolationLevel) - - // if the fetch comes from the follower, - // update its corresponding log end offset - if(Request.isValidBrokerId(replicaId)) - updateFollowerLogReadResults(replicaId, logReadResults) + val isFromFollower = Request.isValidBrokerId(replicaId) + val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId + val fetchOnlyCommitted = !isFromFollower + + def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { + val result = readFromLocalLog( + replicaId = replicaId, + fetchOnlyFromLeader = fetchOnlyFromLeader, + readOnlyCommitted = fetchOnlyCommitted, + fetchMaxBytes = fetchMaxBytes, + hardMaxBytesLimit = hardMaxBytesLimit, + readPartitionInfo = fetchInfos, + quota = quota, + isolationLevel = isolationLevel) + if (isFromFollower) updateFollowerLogReadResults(replicaId, result) + else result + } + + val logReadResults = readFromLog() // check if this fetch request can be satisfied right away val logReadResultValues = logReadResults.map { case (_, v) => v } @@ -943,11 +959,11 @@ class ReplicaManager(val config: KafkaConfig, var minOneMessage = !hardMaxBytesLimit readPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) - val messageSetSize = readResult.info.records.sizeInBytes + val recordBatchSize = readResult.info.records.sizeInBytes // Once we read from a non-empty partition, we stop ignoring request and partition level size limits - if (messageSetSize > 0) + if (recordBatchSize > 0) minOneMessage = false - limitBytes = math.max(0, limitBytes - messageSetSize) + limitBytes = math.max(0, limitBytes - recordBatchSize) result += (tp -> readResult) } result @@ -958,9 +974,9 @@ class ReplicaManager(val config: KafkaConfig, * the quota is exceeded and the replica is not in sync. */ def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = { - val isReplicaInSync = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition).flatMap { partition => - partition.getReplica(replicaId).map(partition.inSyncReplicas.contains) - }.getOrElse(false) + val isReplicaInSync = nonOfflinePartition(topicPartition).exists { partition => + partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains) + } quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync } @@ -1278,44 +1294,50 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } - private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) { - debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults)) - readResults.foreach { case (topicPartition, readResult) => - getPartition(topicPartition) match { + /** + * Update the follower's fetch state in the leader based on the last fetch request and update `readResult`, + * if necessary. + */ + private def updateFollowerLogReadResults(replicaId: Int, + readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { + debug(s"Recording follower broker $replicaId log end offsets: $readResults") + readResults.map { case (topicPartition, readResult) => + var updatedReadResult = readResult + nonOfflinePartition(topicPartition) match { case Some(partition) => - if (partition ne ReplicaManager.OfflinePartition) - partition.updateReplicaLogReadResult(replicaId, readResult) - - // for producer requests with ack > 1, we need to check - // if they can be unblocked after some follower's log end offsets have moved - tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicPartition)) + partition.getReplica(replicaId) match { + case Some(replica) => + if (partition.updateReplicaLogReadResult(replica, readResult)) + partition.leaderReplicaIfLocal.foreach { leaderReplica => + updatedReadResult = readResult.updateLeaderReplicaInfo(leaderReplica) + } + case None => + warn(s"Leader $localBrokerId failed to record follower $replicaId's position " + + s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " + + s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " + + s"for partition $topicPartition. Empty records will be returned for this partition.") + updatedReadResult = readResult.withEmptyFetchInfo + } case None => - warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicPartition)) + warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.") } + topicPartition -> updatedReadResult } } - private def getLeaderPartitions: List[Partition] = - allPartitions.values.filter(partition => (partition ne ReplicaManager.OfflinePartition) && partition.leaderReplicaIfLocal.isDefined).toList + private def leaderPartitionsIterator: Iterator[Partition] = + nonOfflinePartitionsIterator.filter(_.leaderReplicaIfLocal.isDefined) - def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = { - getPartition(topicPartition) match { - case Some(partition) => - if (partition eq ReplicaManager.OfflinePartition) - None - else - partition.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset) - case None => None - } - } + def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = + nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)) // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { - val replicas = allPartitions.values.filter(_ ne ReplicaManager.OfflinePartition).flatMap(_.getReplica(localBrokerId)) - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicas = nonOfflinePartitionsIterator.flatMap(_.getReplica(localBrokerId)).filter(_.log.isDefined).toBuffer + val replicasByDir = replicas.groupBy(_.log.get.dir.getParent) for ((dir, reps) <- replicasByDir) { val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap try { @@ -1334,13 +1356,9 @@ class ReplicaManager(val config: KafkaConfig, info(s"Stopping serving replicas in dir $dir") replicaStateChangeLock synchronized { - val newOfflinePartitions = allPartitions.values.filter { partition => - if (partition eq ReplicaManager.OfflinePartition) - false - else partition.getReplica(config.brokerId) match { - case Some(replica) => - replica.log.isDefined && replica.log.get.dir.getParent == dir - case None => false + val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition => + partition.getReplica(config.brokerId).exists { replica => + replica.log.isDefined && replica.log.get.dir.getParent == dir } }.map(_.topicPartition) http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index ca753d5..825ee89 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -47,6 +47,12 @@ import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils} object CoreUtils extends Logging { /** + * Return the smallest element in `traversable` if it is not empty. Otherwise return `ifEmpty`. + */ + def min[A, B >: A](traversable: TraversableOnce[A], ifEmpty: A)(implicit cmp: Ordering[B]): A = + if (traversable.isEmpty) ifEmpty else traversable.min(cmp) + + /** * Wrap the given function in a java.lang.Runnable * @param fun A function * @return A Runnable that just executes the function http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/main/scala/kafka/utils/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index df74f29..4ddf557 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -27,11 +27,6 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { private val pool: ConcurrentMap[K, V] = new ConcurrentHashMap[K, V] private val createLock = new Object - - def this(m: collection.Map[K, V]) { - this() - m.foreach(kv => pool.put(kv._1, kv._2)) - } def put(k: K, v: V): V = pool.put(k, v) @@ -85,9 +80,9 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def remove(key: K, value: V): Boolean = pool.remove(key, value) - def keys: mutable.Set[K] = pool.keySet().asScala + def keys: mutable.Set[K] = pool.keySet.asScala - def values: Iterable[V] = pool.values().asScala + def values: Iterable[V] = pool.values.asScala def clear() { pool.clear() } http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 0def9ce..4a509ed 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1026,7 +1026,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + mockGetPartition() expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -1079,7 +1079,7 @@ class GroupMetadataManagerTest { val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + mockGetPartition() EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) @@ -1127,7 +1127,7 @@ class GroupMetadataManagerTest { val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + mockGetPartition() EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) @@ -1181,7 +1181,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + mockGetPartition() expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -1259,7 +1259,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + mockGetPartition() expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -1389,4 +1389,9 @@ class GroupMetadataManagerTest { }.toSeq } + private def mockGetPartition(): Unit = { + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/e110e1c1/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 27d4312..b8c78ff 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.zookeeper.data.Stat import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -46,12 +47,14 @@ class ReplicaManagerTest { val topic = "test-topic" val time = new MockTime val metrics = new Metrics - var zkClient : ZkClient = _ - var zkUtils : ZkUtils = _ + var zkClient: ZkClient = _ + var zkUtils: ZkUtils = _ @Before def setUp() { zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.expect(zkClient.readData(EasyMock.anyString(), EasyMock.anyObject[Stat])).andReturn(null).anyTimes() + EasyMock.replay(zkClient) zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) } @@ -319,7 +322,7 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) try { - val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava + val brokerList = Seq[Integer](0, 1).asJava val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) partition.getOrCreateReplica(0) @@ -378,23 +381,7 @@ class ReplicaManagerTest { @Test def testFetchBeyondHighWatermarkReturnEmptyResponse() { - val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) - props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) - props.put("broker.id", Int.box(0)) - val config = KafkaConfig.fromProps(props) - val logProps = new Properties() - val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) - val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2)) - val metadataCache = EasyMock.createMock(classOf[MetadataCache]) - EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() - EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes() - EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes() - EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes() - EasyMock.replay(metadataCache) - val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, - metadataCache, new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName)) - + val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2)) try { val brokerList = Seq[Integer](0, 1, 2).asJava @@ -432,6 +419,86 @@ class ReplicaManagerTest { } } + /** + * If a follower sends a fetch request for 2 partitions and it's no longer the follower for one of them, the other + * partition should not be affected. + */ + @Test + def testFetchMessagesWhenNotFollowerForOnePartition() { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer, aliveBrokerIds = Seq(0, 1, 2)) + + try { + // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) + replicaManager.getOrCreatePartition(tp0).getOrCreateReplica(0) + replicaManager.getOrCreatePartition(tp1).getOrCreateReplica(0) + val partition0Replicas = Seq[Integer](0, 1).asJava + val partition1Replicas = Seq[Integer](0, 2).asJava + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, + collection.immutable.Map( + tp0 -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, partition0Replicas, 0, partition0Replicas, true), + tp1 -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, partition1Replicas, 0, partition1Replicas, true) + ).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + + // Append a couple of messages. + for (i <- 1 to 2) { + appendRecords(replicaManager, tp0, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => + assertEquals(Errors.NONE, response.error) + } + appendRecords(replicaManager, tp1, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => + assertEquals(Errors.NONE, response.error) + } + } + + def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { + val responseStatusMap = responseStatus.toMap + assertEquals(2, responseStatus.size) + assertEquals(Set(tp0, tp1), responseStatusMap.keySet) + + val tp0Status = responseStatusMap.get(tp0) + assertTrue(tp0Status.isDefined) + assertEquals(1, tp0Status.get.highWatermark) + assertEquals(None, tp0Status.get.lastStableOffset) + assertEquals(Errors.NONE, tp0Status.get.error) + assertTrue(tp0Status.get.records.batches.iterator.hasNext) + + val tp1Status = responseStatusMap.get(tp1) + assertTrue(tp1Status.isDefined) + assertEquals(0, tp1Status.get.highWatermark) + assertEquals(None, tp0Status.get.lastStableOffset) + assertEquals(Errors.NONE, tp1Status.get.error) + assertFalse(tp1Status.get.records.batches.iterator.hasNext) + } + + replicaManager.fetchMessages( + timeout = 1000, + replicaId = 1, + fetchMinBytes = 0, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + fetchInfos = Seq( + tp0 -> new PartitionData(1, 0, 100000), + tp1 -> new PartitionData(1, 0, 100000)), + responseCallback = fetchCallback, + isolationLevel = IsolationLevel.READ_UNCOMMITTED + ) + val tp0Replica = replicaManager.getReplica(tp0) + assertTrue(tp0Replica.isDefined) + assertEquals("hw should be incremented", 1, tp0Replica.get.highWatermark.messageOffset) + + replicaManager.getReplica(tp1) + val tp1Replica = replicaManager.getReplica(tp1) + assertTrue(tp1Replica.isDefined) + assertEquals("hw should not be incremented", 0, tp1Replica.get.highWatermark.messageOffset) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + private class CallbackResult[T] { private var value: Option[T] = None private var fun: Option[T => Unit] = None @@ -522,18 +589,18 @@ class ReplicaManagerTest { result } - private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer): ReplicaManager = { - val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) + private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) - props.put("broker.id", Int.box(0)) val config = KafkaConfig.fromProps(props) val logProps = new Properties() val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) - val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) + val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() - EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes() - EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes() + aliveBrokerIds.foreach { brokerId => + EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes() + } EasyMock.replay(metadataCache) val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](