This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f8176a3 MINOR: Add RaftReplicaManager (#10069)
f8176a3 is described below
commit f8176a31615c82946fe9835143c6fa5d27023583
Author: Ron Dagostino <[email protected]>
AuthorDate: Wed Feb 10 17:37:16 2021 -0500
MINOR: Add RaftReplicaManager (#10069)
This adds the logic to apply partition metadata when consuming from the
Raft-based
metadata log.
RaftReplicaManager extends ReplicaManager for now to minimize changes to
existing
code for the 2.8 release. We will likely adjust this hierarchy at a later
time (e.g. introducing
a trait and adding a helper to refactor common code). For now, we expose
the necessary
fields and methods in ReplicaManager by changing their scope from private
to protected,
and we refactor out a couple of pieces of logic that are shared between the
two
implementation (stopping replicas and adding log dir fetchers).
Reviewers: Colin P. McCabe <[email protected]>, Ismael Juma
<[email protected]>
---
.../scala/kafka/server/DelayedDeleteRecords.scala | 2 +-
.../kafka/server/RaftReplicaChangeDelegate.scala | 246 ++++++++++++++
.../scala/kafka/server/RaftReplicaManager.scala | 377 +++++++++++++++++++++
.../main/scala/kafka/server/ReplicaManager.scala | 198 +++++------
.../kafka/server/metadata/MetadataImage.scala | 7 +-
.../unit/kafka/server/RaftReplicaManagerTest.scala | 238 +++++++++++++
6 files changed, 961 insertions(+), 107 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 164cab5..dae17c6 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -84,7 +84,7 @@ class DelayedDeleteRecords(delayMs: Long,
(false, Errors.NOT_LEADER_OR_FOLLOWER,
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
- case HostedPartition.Deferred(_) =>
+ case _: HostedPartition.Deferred =>
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION,
DeleteRecordsResponse.INVALID_LOW_WATERMARK)
case HostedPartition.Offline =>
diff --git a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
new file mode 100644
index 0000000..0b7ee42
--- /dev/null
+++ b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.Log
+import kafka.server.checkpoints.OffsetCheckpoints
+import kafka.server.metadata.{MetadataBrokers, MetadataPartition}
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+
+import scala.collection.{Map, Set, mutable}
+
+trait RaftReplicaChangeDelegateHelper {
+ def stateChangeLogger: StateChangeLogger
+ def replicaFetcherManager: ReplicaFetcherManager
+ def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager
+ def markDeferred(state: HostedPartition.Deferred): Unit
+ def getLogDir(topicPartition: TopicPartition): Option[String]
+ def error(msg: => String, e: => Throwable): Unit
+ def markOffline(topicPartition: TopicPartition): Unit
+ def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition):
Unit
+ def isShuttingDown: Boolean
+ def initialFetchOffset(log: Log): Long
+ def config: KafkaConfig
+}
+
+class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper) {
+ def makeDeferred(partitionsNewMap: Map[Partition, Boolean],
+ metadataOffset: Long): Unit = {
+ val traceLoggingEnabled = helper.stateChangeLogger.isTraceEnabled
+ if (traceLoggingEnabled)
+ partitionsNewMap.forKeyValue { (partition, isNew) =>
+ helper.stateChangeLogger.trace(s"Metadata batch $metadataOffset:
starting the " +
+ s"become-deferred transition for partition
${partition.topicPartition} isNew=$isNew")
+ }
+
+ // Stop fetchers for all the partitions
+
helper.replicaFetcherManager.removeFetcherForPartitions(partitionsNewMap.keySet.map(_.topicPartition))
+ helper.stateChangeLogger.info(s"Metadata batch $metadataOffset: as part of
become-deferred request, " +
+ s"stopped any fetchers for ${partitionsNewMap.size} partitions")
+ // mark all the partitions as deferred
+ partitionsNewMap.forKeyValue((partition, isNew) =>
helper.markDeferred(HostedPartition.Deferred(partition, isNew)))
+
+ helper.replicaFetcherManager.shutdownIdleFetcherThreads()
+ helper.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+
+ if (traceLoggingEnabled)
+ partitionsNewMap.keys.foreach { partition =>
+ helper.stateChangeLogger.trace(s"Completed batch $metadataOffset
become-deferred " +
+ s"transition for partition ${partition.topicPartition}")
+ }
+ }
+
+ def makeLeaders(prevPartitionsAlreadyExisting: Set[MetadataPartition],
+ partitionStates: Map[Partition, MetadataPartition],
+ highWatermarkCheckpoints: OffsetCheckpoints,
+ metadataOffset: Option[Long]): Set[Partition] = {
+ val partitionsMadeLeaders = mutable.Set[Partition]()
+ val traceLoggingEnabled = helper.stateChangeLogger.isTraceEnabled
+ val deferredBatches = metadataOffset.isEmpty
+ val topLevelLogPrefix = if (deferredBatches)
+ "Metadata batch <multiple deferred>"
+ else
+ s"Metadata batch ${metadataOffset.get}"
+ try {
+ // First stop fetchers for all the partitions
+
helper.replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition))
+ helper.stateChangeLogger.info(s"$topLevelLogPrefix: stopped
${partitionStates.size} fetcher(s)")
+ // Update the partition information to be the leader
+ partitionStates.forKeyValue { (partition, state) =>
+ val topicPartition = partition.topicPartition
+ val partitionLogMsgPrefix = if (deferredBatches)
+ s"Apply deferred leader partition $topicPartition"
+ else
+ s"Metadata batch ${metadataOffset.get} $topicPartition"
+ try {
+ val isrState = state.toLeaderAndIsrPartitionState(
+ !prevPartitionsAlreadyExisting(state))
+ if (partition.makeLeader(isrState, highWatermarkCheckpoints)) {
+ partitionsMadeLeaders += partition
+ if (traceLoggingEnabled) {
+ helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix:
completed the become-leader state change.")
+ }
+ } else {
+ helper.stateChangeLogger.info(s"$partitionLogMsgPrefix: skipped
the " +
+ "become-leader state change since it is already the leader.")
+ }
+ } catch {
+ case e: KafkaStorageException =>
+ helper.stateChangeLogger.error(s"$partitionLogMsgPrefix: unable to
make " +
+ s"leader because the replica for the partition is offline due to
disk error $e")
+ val dirOpt = helper.getLogDir(topicPartition)
+ helper.error(s"Error while making broker the leader for partition
$partition in dir $dirOpt", e)
+ helper.markOffline(topicPartition)
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ helper.stateChangeLogger.error(s"$topLevelLogPrefix: error while
processing batch.", e)
+ // Re-throw the exception for it to be caught in BrokerMetadataListener
+ throw e
+ }
+ partitionsMadeLeaders
+ }
+
+ def makeFollowers(prevPartitionsAlreadyExisting: Set[MetadataPartition],
+ currentBrokers: MetadataBrokers,
+ partitionStates: Map[Partition, MetadataPartition],
+ highWatermarkCheckpoints: OffsetCheckpoints,
+ metadataOffset: Option[Long]): Set[Partition] = {
+ val traceLoggingEnabled = helper.stateChangeLogger.isTraceEnabled
+ val deferredBatches = metadataOffset.isEmpty
+ val topLevelLogPrefix = if (deferredBatches)
+ "Metadata batch <multiple deferred>"
+ else
+ s"Metadata batch ${metadataOffset.get}"
+ if (traceLoggingEnabled) {
+ partitionStates.forKeyValue { (partition, state) =>
+ val topicPartition = partition.topicPartition
+ val partitionLogMsgPrefix = if (deferredBatches)
+ s"Apply deferred follower partition $topicPartition"
+ else
+ s"Metadata batch ${metadataOffset.get} $topicPartition"
+ helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: starting the
" +
+ s"become-follower transition with leader ${state.leaderId}")
+ }
+ }
+
+ val partitionsMadeFollower: mutable.Set[Partition] = mutable.Set()
+ // all brokers, including both alive and not
+ val acceptableLeaderBrokerIds = currentBrokers.iterator().map(broker =>
broker.id).toSet
+ val allBrokersByIdMap = currentBrokers.iterator().map(broker => broker.id
-> broker).toMap
+ try {
+ partitionStates.forKeyValue { (partition, state) =>
+ val topicPartition = partition.topicPartition
+ val partitionLogMsgPrefix = if (deferredBatches)
+ s"Apply deferred follower partition $topicPartition"
+ else
+ s"Metadata batch ${metadataOffset.get} $topicPartition"
+ try {
+ val isNew = !prevPartitionsAlreadyExisting(state)
+ if (!acceptableLeaderBrokerIds.contains(state.leaderId)) {
+ // The leader broker should always be present in the metadata
cache.
+ // If not, we should record the error message and abort the
transition process for this partition
+ helper.stateChangeLogger.error(s"$partitionLogMsgPrefix: cannot
become follower " +
+ s"since the new leader ${state.leaderId} is unavailable.")
+ // Create the local replica even if the leader is unavailable.
This is required to ensure that we include
+ // the partition's high watermark in the checkpoint file (see
KAFKA-1647)
+ partition.createLogIfNotExists(isNew, isFutureReplica = false,
highWatermarkCheckpoints)
+ } else {
+ val isrState = state.toLeaderAndIsrPartitionState(isNew)
+ if (partition.makeFollower(isrState, highWatermarkCheckpoints)) {
+ partitionsMadeFollower += partition
+ if (traceLoggingEnabled) {
+ helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix:
completed the " +
+ s"become-follower state change with new leader
${state.leaderId}.")
+ }
+ } else {
+ helper.stateChangeLogger.info(s"$partitionLogMsgPrefix: skipped
the " +
+ s"become-follower state change since " +
+ s"the new leader ${state.leaderId} is the same as the old
leader.")
+ }
+ }
+ } catch {
+ case e: KafkaStorageException =>
+ helper.stateChangeLogger.error(s"$partitionLogMsgPrefix: unable to
complete the " +
+ s"become-follower state change since the " +
+ s"replica for the partition is offline due to disk error $e")
+ val dirOpt = helper.getLogDir(partition.topicPartition)
+ helper.error(s"Error while making broker the follower with leader
${state.leaderId} in dir $dirOpt", e)
+ helper.markOffline(topicPartition)
+ }
+ }
+
+ if (partitionsMadeFollower.nonEmpty) {
+
helper.replicaFetcherManager.removeFetcherForPartitions(partitionsMadeFollower.map(_.topicPartition))
+ helper.stateChangeLogger.info(s"$topLevelLogPrefix: stopped followers
for ${partitionsMadeFollower.size} partitions")
+
+ partitionsMadeFollower.foreach { partition =>
+
helper.completeDelayedFetchOrProduceRequests(partition.topicPartition)
+ }
+
+ if (helper.isShuttingDown) {
+ if (traceLoggingEnabled) {
+ partitionsMadeFollower.foreach { partition =>
+ val topicPartition = partition.topicPartition
+ val partitionLogMsgPrefix = if (deferredBatches)
+ s"Apply deferred follower partition $topicPartition"
+ else
+ s"Metadata batch ${metadataOffset.get} $topicPartition"
+ helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: skipped
the " +
+ s"adding-fetcher step of the become-follower state for " +
+ s"$topicPartition since we are shutting down.")
+ }
+ }
+ } else {
+ // we do not need to check if the leader exists again since this has
been done at the beginning of this process
+ val partitionsToMakeFollowerWithLeaderAndOffset =
partitionsMadeFollower.map { partition =>
+ val leader =
allBrokersByIdMap(partition.leaderReplicaIdOpt.get).brokerEndPoint(helper.config.interBrokerListenerName)
+ val log = partition.localLogOrException
+ val fetchOffset = helper.initialFetchOffset(log)
+ partition.topicPartition -> InitialFetchState(leader,
partition.getLeaderEpoch, fetchOffset)
+ }.toMap
+
+
helper.replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ helper.stateChangeLogger.error(s"$topLevelLogPrefix: error while
processing batch", e)
+ // Re-throw the exception for it to be caught in BrokerMetadataListener
+ throw e
+ }
+
+ if (traceLoggingEnabled)
+ partitionsMadeFollower.foreach { partition =>
+ val topicPartition = partition.topicPartition
+ val state = partitionStates(partition)
+ val partitionLogMsgPrefix = if (deferredBatches)
+ s"Apply deferred follower partition $topicPartition"
+ else
+ s"Metadata batch ${metadataOffset.get} $topicPartition"
+ helper.stateChangeLogger.trace(s"$partitionLogMsgPrefix: completed
become-follower " +
+ s"transition for partition $topicPartition with new leader
${state.leaderId}")
+ }
+
+ partitionsMadeFollower
+ }
+}
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
new file mode 100644
index 0000000..255b349
--- /dev/null
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.controller.StateChangeLogger
+import kafka.log.{Log, LogManager}
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.server.metadata.{ConfigRepository, MetadataImage,
MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
+import kafka.utils.Scheduler
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Set, mutable}
+
+class RaftReplicaManager(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: RaftMetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ delayedProducePurgatory:
DelayedOperationPurgatory[DelayedProduce],
+ delayedFetchPurgatory:
DelayedOperationPurgatory[DelayedFetch],
+ delayedDeleteRecordsPurgatory:
DelayedOperationPurgatory[DelayedDeleteRecords],
+ delayedElectLeaderPurgatory:
DelayedOperationPurgatory[DelayedElectLeader],
+ threadNamePrefix: Option[String],
+ configRepository: ConfigRepository,
+ alterIsrManager: AlterIsrManager) extends
ReplicaManager(
+ config, metrics, time, None, scheduler, logManager, isShuttingDown,
quotaManagers,
+ brokerTopicStats, metadataCache, logDirFailureChannel,
delayedProducePurgatory, delayedFetchPurgatory,
+ delayedDeleteRecordsPurgatory, delayedElectLeaderPurgatory,
threadNamePrefix, configRepository, alterIsrManager) {
+
+ def this(config: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ scheduler: Scheduler,
+ logManager: LogManager,
+ isShuttingDown: AtomicBoolean,
+ quotaManagers: QuotaManagers,
+ brokerTopicStats: BrokerTopicStats,
+ metadataCache: RaftMetadataCache,
+ logDirFailureChannel: LogDirFailureChannel,
+ alterIsrManager: AlterIsrManager,
+ configRepository: ConfigRepository,
+ threadNamePrefix: Option[String] = None) = {
+ this(config, metrics, time, scheduler, logManager, isShuttingDown,
+ quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
+ DelayedOperationPurgatory[DelayedProduce](
+ purgatoryName = "Produce", brokerId = config.brokerId,
+ purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
+ DelayedOperationPurgatory[DelayedFetch](
+ purgatoryName = "Fetch", brokerId = config.brokerId,
+ purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
+ DelayedOperationPurgatory[DelayedDeleteRecords](
+ purgatoryName = "DeleteRecords", brokerId = config.brokerId,
+ purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
+ DelayedOperationPurgatory[DelayedElectLeader](
+ purgatoryName = "ElectLeader", brokerId = config.brokerId),
+ threadNamePrefix, configRepository, alterIsrManager)
+ }
+
+ if (config.requiresZookeeper) {
+ throw new IllegalStateException(s"Cannot use ${getClass.getSimpleName}
when using ZooKeeper")
+ }
+
+ class RaftReplicaManagerChangeDelegateHelper(raftReplicaManager:
RaftReplicaManager) extends RaftReplicaChangeDelegateHelper {
+ override def completeDelayedFetchOrProduceRequests(topicPartition:
TopicPartition): Unit =
raftReplicaManager.completeDelayedFetchOrProduceRequests(topicPartition)
+
+ override def config: KafkaConfig = raftReplicaManager.config
+
+ override def error(msg: => String, e: => Throwable): Unit =
raftReplicaManager.error(msg, e)
+
+ override def getLogDir(topicPartition: TopicPartition): Option[String] =
raftReplicaManager.getLogDir(topicPartition)
+
+ override def initialFetchOffset(log: Log): Long =
raftReplicaManager.initialFetchOffset(log)
+
+ override def isShuttingDown: Boolean =
raftReplicaManager.isShuttingDown.get
+
+ override def markDeferred(state: HostedPartition.Deferred): Unit =
raftReplicaManager.markPartitionDeferred(state)
+
+ override def markOffline(topicPartition: TopicPartition): Unit =
raftReplicaManager.markPartitionOffline(topicPartition)
+
+ override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager =
raftReplicaManager.replicaAlterLogDirsManager
+
+ override def replicaFetcherManager: ReplicaFetcherManager =
raftReplicaManager.replicaFetcherManager
+
+ override def stateChangeLogger: StateChangeLogger =
raftReplicaManager.stateChangeLogger
+ }
+
+ // visible/overwriteable for testing, generally will not change otherwise
+ private[server] var delegate = new RaftReplicaChangeDelegate(new
RaftReplicaManagerChangeDelegateHelper(this))
+
+ // Changes are initially deferred when using a Raft-based metadata quorum,
and they may flip-flop to not
+ // being deferred and being deferred again thereafter as the broker
(re)acquires/loses its lease.
+ // Changes are never deferred when using ZooKeeper. When true, this
indicates that we should transition
+ // online partitions to the deferred state if we see a metadata update for
that partition.
+ private var deferringMetadataChanges: Boolean = true
+ stateChangeLogger.debug(s"Metadata changes are initially being deferred")
+
+ def beginMetadataChangeDeferral(): Unit = {
+ replicaStateChangeLock synchronized {
+ deferringMetadataChanges = true
+ stateChangeLogger.info(s"Metadata changes are now being deferred")
+ }
+ }
+
+ def endMetadataChangeDeferral(onLeadershipChange: (Iterable[Partition],
Iterable[Partition]) => Unit): Unit = {
+ val startMs = time.milliseconds()
+ val partitionsMadeFollower = mutable.Set[Partition]()
+ val partitionsMadeLeader = mutable.Set[Partition]()
+ replicaStateChangeLock synchronized {
+ stateChangeLogger.info(s"Applying deferred metadata changes")
+ val highWatermarkCheckpoints = new
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val metadataImage = metadataCache.currentImage()
+ val brokers = metadataImage.brokers
+ try {
+ val leaderPartitionStates = mutable.Map[Partition, MetadataPartition]()
+ val followerPartitionStates = mutable.Map[Partition,
MetadataPartition]()
+ val partitionsAlreadyExisting = mutable.Set[MetadataPartition]()
+ deferredPartitionsIterator.foreach { deferredPartition =>
+ val partition = deferredPartition.partition
+ val state = cachedState(metadataImage, partition)
+ if (state.leaderId == localBrokerId) {
+ leaderPartitionStates.put(partition, state)
+ } else {
+ followerPartitionStates.put(partition, state)
+ }
+ if (!deferredPartition.isNew) {
+ partitionsAlreadyExisting += state
+ }
+ }
+
+ val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
+ delegate.makeLeaders(partitionsAlreadyExisting,
leaderPartitionStates, highWatermarkCheckpoints, None)
+ else
+ Set.empty[Partition]
+ val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
+ delegate.makeFollowers(partitionsAlreadyExisting, brokers,
followerPartitionStates, highWatermarkCheckpoints, None)
+ else
+ Set.empty[Partition]
+
+ // We need to transition anything that hasn't transitioned from
Deferred to Offline to the Online state.
+ deferredPartitionsIterator.foreach { deferredPartition =>
+ val partition = deferredPartition.partition
+ allPartitions.put(partition.topicPartition,
HostedPartition.Online(partition))
+ }
+
+
updateLeaderAndFollowerMetrics(partitionsMadeFollower.map(_.topic).toSet)
+
+ maybeAddLogDirFetchers(partitionsMadeFollower,
highWatermarkCheckpoints)
+
+ replicaFetcherManager.shutdownIdleFetcherThreads()
+ replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+ if (partitionsMadeLeader.nonEmpty || partitionsMadeFollower.nonEmpty) {
+ onLeadershipChange(partitionsMadeLeader, partitionsMadeFollower)
+ }
+ } catch {
+ case e: Throwable =>
+ deferredPartitionsIterator.foreach { metadata =>
+ val partition = metadata.partition
+ val state = cachedState(metadataImage, partition)
+ val topicPartition = partition.topicPartition
+ val leader = state.leaderId == localBrokerId
+ val leaderOrFollower = if (leader) "leader" else "follower"
+ val partitionLogMsgPrefix = s"Apply deferred $leaderOrFollower
partition $topicPartition"
+ stateChangeLogger.error(s"$partitionLogMsgPrefix: error while
applying deferred metadata.", e)
+ }
+ stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size +
partitionsMadeFollower.size} deferred partitions prior to the error: " +
+ s"${partitionsMadeLeader.size} leader(s) and
${partitionsMadeFollower.size} follower(s)")
+ // Re-throw the exception for it to be caught in
BrokerMetadataListener
+ throw e
+ }
+ deferringMetadataChanges = false
+ }
+ val endMs = time.milliseconds()
+ val elapsedMs = endMs - startMs
+ stateChangeLogger.info(s"Applied ${partitionsMadeLeader.size +
partitionsMadeFollower.size} deferred partitions: " +
+ s"${partitionsMadeLeader.size} leader(s) and
${partitionsMadeFollower.size} follower(s)" +
+ s"in $elapsedMs ms")
+ stateChangeLogger.info("Metadata changes are no longer being deferred")
+ }
+
+ /**
+ * Handle changes made by a batch of metadata log records.
+ *
+ * @param imageBuilder The MetadataImage builder.
+ * @param metadataOffset The last offset in the batch of records.
+ * @param onLeadershipChange The callbacks to invoke when leadership changes.
+ */
+ def handleMetadataRecords(imageBuilder: MetadataImageBuilder,
+ metadataOffset: Long,
+ onLeadershipChange: (Iterable[Partition],
Iterable[Partition]) => Unit): Unit = {
+ val startMs = time.milliseconds()
+ val builder = imageBuilder.partitionsBuilder()
+ replicaStateChangeLock synchronized {
+ stateChangeLogger.info(("Metadata batch %d: %d local partition(s)
changed, %d " +
+ "local partition(s) removed.").format(metadataOffset,
builder.localChanged().size,
+ builder.localRemoved().size))
+ if (stateChangeLogger.isTraceEnabled) {
+ builder.localChanged().foreach { state =>
+ stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally
changed: ${state}")
+ }
+ builder.localRemoved().foreach { state =>
+ stateChangeLogger.trace(s"Metadata batch $metadataOffset: locally
removed: ${state}")
+ }
+ }
+ if (deferringMetadataChanges) {
+ val prevPartitions = imageBuilder.prevImage.partitions
+ // partitionChangesToBeDeferred maps each partition to be deferred to
whether it is new (i.e. existed before deferral began)
+ val partitionChangesToBeDeferred = mutable.HashMap[Partition,
Boolean]()
+ builder.localChanged().foreach { currentState =>
+ val topicPartition = currentState.toTopicPartition
+ val (partition, priorDeferredMetadata) =
getPartition(topicPartition) match {
+ case HostedPartition.Offline =>
+ stateChangeLogger.warn(s"Ignoring handlePartitionChanges at
$metadataOffset " +
+ s"for partition $topicPartition as the local replica for the
partition is " +
+ "in an offline log directory")
+ (None, None)
+
+ case HostedPartition.Online(partition) => (Some(partition), None)
+ case [email protected](partition, _) =>
(Some(partition), Some(deferred))
+
+ case HostedPartition.None =>
+ // Create the partition instance since it does not yet exist
+ (Some(Partition(topicPartition, time, configRepository, this)),
None)
+ }
+ partition.foreach { partition =>
+ val isNew = priorDeferredMetadata match {
+ case Some(alreadyDeferred) => alreadyDeferred.isNew
+ case _ => prevPartitions.topicPartition(topicPartition.topic(),
topicPartition.partition()).isEmpty
+ }
+ partitionChangesToBeDeferred.put(partition, isNew)
+ }
+ }
+
+ stateChangeLogger.info(s"Deferring metadata changes for
${partitionChangesToBeDeferred.size} partition(s)")
+ if (partitionChangesToBeDeferred.nonEmpty) {
+ delegate.makeDeferred(partitionChangesToBeDeferred, metadataOffset)
+ }
+ } else { // not deferring changes, so make leaders/followers accordingly
+ val partitionsToBeLeader = mutable.HashMap[Partition,
MetadataPartition]()
+ val partitionsToBeFollower = mutable.HashMap[Partition,
MetadataPartition]()
+ builder.localChanged().foreach { currentState =>
+ val topicPartition = currentState.toTopicPartition
+ val partition = getPartition(topicPartition) match {
+ case HostedPartition.Offline =>
+ stateChangeLogger.warn(s"Ignoring handlePartitionChanges at
$metadataOffset " +
+ s"for partition $topicPartition as the local replica for the
partition is " +
+ "in an offline log directory")
+ None
+
+ case HostedPartition.Online(partition) => Some(partition)
+ case _: HostedPartition.Deferred => throw new
IllegalStateException(
+ s"There should never be deferred partition metadata when we
aren't deferring changes: $topicPartition")
+
+ case HostedPartition.None =>
+ // it's a partition that we don't know about yet, so create it
and mark it online
+ val partition = Partition(topicPartition, time,
configRepository, this)
+ allPartitions.putIfNotExists(topicPartition,
HostedPartition.Online(partition))
+ Some(partition)
+ }
+ partition.foreach { partition =>
+ if (currentState.leaderId == localBrokerId) {
+ partitionsToBeLeader.put(partition, currentState)
+ } else {
+ partitionsToBeFollower.put(partition, currentState)
+ }
+ }
+ }
+
+ val prevPartitions = imageBuilder.prevImage.partitions
+ val changedPartitionsPreviouslyExisting =
mutable.Set[MetadataPartition]()
+ builder.localChanged().foreach(metadataPartition =>
+ prevPartitions.topicPartition(metadataPartition.topicName,
metadataPartition.partitionIndex).foreach(
+ changedPartitionsPreviouslyExisting.add))
+ val nextBrokers = imageBuilder.brokers()
+ val highWatermarkCheckpoints = new
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
+ delegate.makeLeaders(changedPartitionsPreviouslyExisting,
partitionsToBeLeader, highWatermarkCheckpoints,
+ Some(metadataOffset))
+ else
+ Set.empty[Partition]
+ val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
+ delegate.makeFollowers(changedPartitionsPreviouslyExisting,
nextBrokers, partitionsToBeFollower, highWatermarkCheckpoints,
+ Some(metadataOffset))
+ else
+ Set.empty[Partition]
+
updateLeaderAndFollowerMetrics(partitionsBecomeFollower.map(_.topic).toSet)
+
+ builder.localChanged().foreach { state =>
+ val topicPartition = state.toTopicPartition
+ /*
+ * If there is offline log directory, a Partition object may have
been created by getOrCreatePartition()
+ * before getOrCreateReplica() failed to create local replica due to
KafkaStorageException.
+ * In this case ReplicaManager.allPartitions will map this
topic-partition to an empty Partition object.
+ * we need to map this topic-partition to OfflinePartition instead.
+ */
+ if (localLog(topicPartition).isEmpty) {
+ markPartitionOffline(topicPartition)
+ }
+ }
+
+ maybeAddLogDirFetchers(partitionsBecomeFollower,
highWatermarkCheckpoints)
+
+ replicaFetcherManager.shutdownIdleFetcherThreads()
+ replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+ onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+ }
+ // TODO: we should move aside log directories which have been deleted
rather than
+ // purging them from the disk immediately.
+ if (builder.localRemoved().nonEmpty) {
+ // we schedule removal immediately even if we are deferring changes
+ stopPartitions(builder.localRemoved().map(_.toTopicPartition ->
true).toMap).foreach { case (topicPartition, e) =>
+ if (e.isInstanceOf[KafkaStorageException]) {
+ stateChangeLogger.error(s"Metadata batch $metadataOffset: unable
to delete " +
+ s"${topicPartition} as the local replica for the partition is in
an offline " +
+ "log directory")
+ } else {
+ stateChangeLogger.error(s"Metadata batch $metadataOffset: unable
to delete " +
+ s"${topicPartition} due to an unexpected ${e.getClass.getName}
exception: " +
+ s"${e.getMessage}")
+ }
+ }
+ }
+ }
+ val endMs = time.milliseconds()
+ val elapsedMs = endMs - startMs
+ stateChangeLogger.info(s"Metadata batch $metadataOffset: handled replica
changes " +
+ s"in ${elapsedMs} ms")
+ }
+
+ def markPartitionDeferred(partition: Partition, isNew: Boolean): Unit = {
+ markPartitionDeferred(HostedPartition.Deferred(partition, isNew))
+ }
+
+ private def markPartitionDeferred(state: HostedPartition.Deferred): Unit =
replicaStateChangeLock synchronized {
+ allPartitions.put(state.partition.topicPartition, state)
+ }
+
+ // An iterator over all deferred partitions. This is a weakly consistent
iterator; a partition made off/online
+ // after the iterator has been constructed could still be returned by this
iterator.
+ private def deferredPartitionsIterator: Iterator[HostedPartition.Deferred] =
{
+ allPartitions.values.iterator.flatMap {
+ case deferred: HostedPartition.Deferred => Some(deferred)
+ case _ => None
+ }
+ }
+
+ private def cachedState(metadataImage: MetadataImage, partition: Partition):
MetadataPartition = {
+ metadataImage.partitions.topicPartition(partition.topic,
partition.partitionId).getOrElse(
+ throw new IllegalStateException(s"Partition has metadata changes but
does not exist in the metadata cache: ${partition.topicPartition}"))
+ }
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e639727..3f52e22 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -163,7 +163,7 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
sealed trait HostedPartition
/**
- * Trait to represent a partition that isn't Offline -- i.e. it is either
Online or it is Deferred
+ * Trait to represent a partition that isn't Offline -- i.e. it is either
Online or it is Deferred.
*/
sealed trait NonOffline extends HostedPartition {
val partition: Partition
@@ -180,12 +180,13 @@ object HostedPartition {
final case class Online(partition: Partition) extends NonOffline
/**
- * This broker hosted the partition but it is deferring metadata changes
- * until it catches up on the Raft-based metadata log.
- * This state only applies to brokers that are using a Raft-based metadata
- * quorum; it never happens when using ZooKeeper.
+ * This broker hosted the partition (or will soon host it if it is new) but
+ * it is deferring metadata changes until it catches up on the Raft-based
metadata
+ * log. This state only applies to brokers that are using a Raft-based
metadata
+ * quorum; it never happens when using ZooKeeper. The isNew value indicates
+ * if the partition needs to be created when we apply the deferred changes.
*/
- final case class Deferred(partition: Partition) extends NonOffline
+ final case class Deferred(partition: Partition, isNew: Boolean) extends
NonOffline
/**
* This broker hosts the partition, but it is in an offline log directory.
@@ -247,20 +248,20 @@ class ReplicaManager(val config: KafkaConfig,
}
/* epoch of the controller that last changed the leader */
- @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
- private val localBrokerId = config.brokerId
- private val allPartitions = new Pool[TopicPartition, HostedPartition](
+ @volatile private[server] var controllerEpoch: Int =
KafkaController.InitialControllerEpoch
+ protected val localBrokerId = config.brokerId
+ protected val allPartitions = new Pool[TopicPartition, HostedPartition](
valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time,
configRepository, this)))
)
- private val replicaStateChangeLock = new Object
+ protected val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time,
threadNamePrefix, quotaManagers.follower)
- val replicaAlterLogDirsManager =
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
+ private[server] val replicaAlterLogDirsManager =
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
- @volatile var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] =
logManager.liveLogDirs.map(dir =>
+ @volatile private[server] var highWatermarkCheckpoints: Map[String,
OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir,
ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
- private val stateChangeLogger = new StateChangeLogger(localBrokerId,
inControllerContext = false, None)
+ protected val stateChangeLogger = new StateChangeLogger(localBrokerId,
inControllerContext = false, None)
private var logDirFailureHandler: LogDirFailureHandler = null
@@ -275,33 +276,6 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- // Changes are initially deferred when using a Raft-based metadata quorum,
and they may flip-flop to not
- // being deferred and being deferred again thereafter as the broker
(re)acquires/loses its lease.
- // Changes are never deferred when using ZooKeeper. When true, this
indicates that we should transition
- // online partitions to the deferred state if we see a metadata update for
that partition.
- private var deferringMetadataChanges: Boolean = !config.requiresZookeeper
- stateChangeLogger.debug(s"Metadata changes
deferred=$deferringMetadataChanges")
-
- def beginMetadataChangeDeferral(): Unit = {
- if (config.requiresZookeeper) {
- throw new IllegalStateException("Partition metadata changes can never be
deferred when using ZooKeeper")
- }
- replicaStateChangeLock synchronized {
- deferringMetadataChanges = true
- stateChangeLogger.info(s"Metadata changes are now being deferred")
- }
- }
-
- def endMetadataChangeDeferral(): Unit = {
- if (config.requiresZookeeper) {
- throw new IllegalStateException("Partition metadata changes can never be
deferred when using ZooKeeper")
- }
- replicaStateChangeLock synchronized {
- deferringMetadataChanges = false
- stateChangeLogger.info(s"Metadata changes are no longer being deferred")
- }
- }
-
// Visible for testing
private[server] val replicaSelectorOpt: Option[ReplicaSelector] =
createReplicaSelector()
@@ -367,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats.removeMetrics(topic)
}
- private def completeDelayedFetchOrProduceRequests(topicPartition:
TopicPartition): Unit = {
+ protected def completeDelayedFetchOrProduceRequests(topicPartition:
TopicPartition): Unit = {
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
@@ -399,9 +373,9 @@ class ReplicaManager(val config: KafkaConfig,
} else {
this.controllerEpoch = controllerEpoch
- val stoppedPartitions = mutable.Map.empty[TopicPartition,
StopReplicaPartitionState]
+ val stoppedPartitions = mutable.Map.empty[TopicPartition, Boolean]
partitionStates.forKeyValue { (topicPartition, partitionState) =>
- val deletePartition = partitionState.deletePartition
+ val deletePartition = partitionState.deletePartition()
getPartition(topicPartition) match {
case HostedPartition.Offline =>
@@ -421,7 +395,7 @@ class ReplicaManager(val config: KafkaConfig,
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch > currentLeaderEpoch) {
- stoppedPartitions += topicPartition -> partitionState
+ stoppedPartitions += topicPartition -> deletePartition
// Assume that everything will go right. It is overwritten in
case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -439,71 +413,84 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
}
- case HostedPartition.Deferred(_) =>
+ case _: HostedPartition.Deferred =>
throw new IllegalStateException("We should never be deferring
partition metadata changes and stopping a replica when using ZooKeeper")
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager
doesn't hold them anymore.
// This could happen when topic is being deleted while broker is
down and recovers.
- stoppedPartitions += topicPartition -> partitionState
+ stoppedPartitions += topicPartition -> deletePartition
responseMap.put(topicPartition, Errors.NONE)
}
}
- // First stop fetchers for all partitions.
- val partitions = stoppedPartitions.keySet
- replicaFetcherManager.removeFetcherForPartitions(partitions)
- replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
-
- // Second remove deleted partitions from the partition map. Fetchers
rely on the
- // ReplicaManager to get Partition's information so they must be
stopped first.
- val deletedPartitions = mutable.Set.empty[TopicPartition]
- stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
- if (partitionState.deletePartition) {
- getPartition(topicPartition) match {
- case [email protected](partition) =>
- if (allPartitions.remove(topicPartition, hostedPartition)) {
- maybeRemoveTopicMetrics(topicPartition.topic)
- // Logs are not deleted here. They are deleted in a single
batch later on.
- // This is done to avoid having to checkpoint for every
deletions.
- partition.delete()
- }
-
- case _ =>
- }
-
- deletedPartitions += topicPartition
- }
-
- // If we were the leader, we may have some operations still waiting
for completion.
- // We force completion to prevent them from timing out.
- completeDelayedFetchOrProduceRequests(topicPartition)
- }
-
- // Third delete the logs and checkpoint.
- logManager.asyncDelete(deletedPartitions, (topicPartition, exception)
=> {
- exception match {
- case e: KafkaStorageException =>
+ stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+ if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Ignoring StopReplica request
(delete=true) from " +
s"controller $controllerId with correlation id $correlationId
" +
s"epoch $controllerEpoch for partition $topicPartition as the
local replica for the " +
"partition is in an offline log directory")
- responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-
- case e =>
- stateChangeLogger.error(s"Ignoring StopReplica request
(delete=true) from " +
+ } else {
+ stateChangeLogger.error(s"Ignoring StopReplica request
(delete=true) from " +
s"controller $controllerId with correlation id $correlationId
" +
s"epoch $controllerEpoch for partition $topicPartition due to
an unexpected " +
s"${e.getClass.getName} exception: ${e.getMessage}")
responseMap.put(topicPartition, Errors.forException(e))
}
- })
-
+ responseMap.put(topicPartition, Errors.forException(e))
+ }
(responseMap, Errors.NONE)
}
}
}
+ /**
+ * Stop the given partitions.
+ *
+ * @param partitionsToStop A map from a topic partition to a boolean
indicating
+ * whether the partition should be deleted.
+ *
+ * @return A map from partitions to exceptions which
occurred.
+ * If no errors occurred, the map will be empty.
+ */
+ protected def stopPartitions(partitionsToStop: Map[TopicPartition,
Boolean]): Map[TopicPartition, Throwable] = {
+ // First stop fetchers for all partitions.
+ val partitions = partitionsToStop.keySet
+ replicaFetcherManager.removeFetcherForPartitions(partitions)
+ replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+ // Second remove deleted partitions from the partition map. Fetchers rely
on the
+ // ReplicaManager to get Partition's information so they must be stopped
first.
+ val partitionsToDelete = mutable.Set.empty[TopicPartition]
+ partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
+ if (shouldDelete) {
+ getPartition(topicPartition) match {
+ case hostedPartition: NonOffline =>
+ if (allPartitions.remove(topicPartition, hostedPartition)) {
+ maybeRemoveTopicMetrics(topicPartition.topic)
+ // Logs are not deleted here. They are deleted in a single batch
later on.
+ // This is done to avoid having to checkpoint for every
deletions.
+ hostedPartition.partition.delete()
+ }
+
+ case _ =>
+ }
+ partitionsToDelete += topicPartition
+ }
+ // If we were the leader, we may have some operations still waiting for
completion.
+ // We force completion to prevent them from timing out.
+ completeDelayedFetchOrProduceRequests(topicPartition)
+ }
+
+ // Third delete the logs and checkpoint.
+ val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+ if (partitionsToDelete.nonEmpty) {
+ // Delete the logs and checkpoint.
+ logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp,
e))
+ }
+ errorMap
+ }
+
def getPartition(topicPartition: TopicPartition): HostedPartition = {
Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None)
}
@@ -569,7 +556,7 @@ class ReplicaManager(val config: KafkaConfig,
// the local replica has been deleted.
Left(Errors.NOT_LEADER_OR_FOLLOWER)
- case HostedPartition.Deferred(_) =>
+ case _: HostedPartition.Deferred =>
// The topic exists, but this broker is deferring metadata changes for
it, so we return NOT_LEADER_OR_FOLLOWER
// which forces clients to refresh metadata.
Left(Errors.NOT_LEADER_OR_FOLLOWER)
@@ -766,7 +753,7 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.Offline =>
throw new KafkaStorageException(s"Partition $topicPartition is
offline")
- case HostedPartition.Deferred(_) =>
+ case _: HostedPartition.Deferred =>
throw new IllegalStateException(s"Partition $topicPartition is
deferred")
case HostedPartition.None => // Do nothing
@@ -1365,7 +1352,7 @@ class ReplicaManager(val config: KafkaConfig,
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
- case HostedPartition.Deferred(_) =>
+ case _: HostedPartition.Deferred =>
throw new IllegalStateException("We should never be deferring
partition metadata changes and becoming a leader or follower when using
ZooKeeper")
case HostedPartition.Online(partition) =>
@@ -1426,18 +1413,8 @@ class ReplicaManager(val config: KafkaConfig,
else
Set.empty[Partition]
- /*
- * KAFKA-8392
- * For topic partitions of which the broker is no longer a leader,
delete metrics related to
- * those topics. Note that this means the broker stops being either a
replica or a leader of
- * partitions of said topics
- */
- val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
-
followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
-
- // remove metrics for brokers which are not followers of a topic
-
leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
+ updateLeaderAndFollowerMetrics(followerTopicSet)
leaderAndIsrRequest.partitionStates.forEach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName,
partitionState.partitionIndex)
@@ -1520,7 +1497,21 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- private def maybeAddLogDirFetchers(partitions: Set[Partition],
+ /**
+ * KAFKA-8392
+ * For topic partitions of which the broker is no longer a leader, delete
metrics related to
+ * those topics. Note that this means the broker stops being either a
replica or a leader of
+ * partitions of said topics
+ */
+ protected def updateLeaderAndFollowerMetrics(newFollowerTopics:
Set[String]): Unit = {
+ val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
+
newFollowerTopics.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
+
+ // remove metrics for brokers which are not followers of a topic
+
leaderTopicSet.diff(newFollowerTopics).foreach(brokerTopicStats.removeOldFollowerMetrics)
+ }
+
+ protected def maybeAddLogDirFetchers(partitions: Set[Partition],
offsetCheckpoints: OffsetCheckpoints):
Unit = {
val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition,
InitialFetchState]
for (partition <- partitions) {
@@ -1753,7 +1744,7 @@ class ReplicaManager(val config: KafkaConfig,
* diverging epoch is returned in the response, avoiding the need for a
separate
* OffsetForLeaderEpoch request.
*/
- private def initialFetchOffset(log: Log): Long = {
+ protected def initialFetchOffset(log: Log): Long = {
if
(ApiVersion.isTruncationOnFetchSupported(config.interBrokerProtocolVersion) &&
log.latestEpoch.nonEmpty)
log.logEndOffset
else
@@ -1841,7 +1832,6 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- // Used only by test
def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock
synchronized {
allPartitions.put(tp, HostedPartition.Offline)
Partition.removeMetrics(tp)
@@ -1964,7 +1954,7 @@ class ReplicaManager(val config: KafkaConfig,
.setPartition(offsetForLeaderPartition.partition)
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
- case HostedPartition.Deferred(_) =>
+ case _: HostedPartition.Deferred =>
new EpochEndOffset()
.setPartition(offsetForLeaderPartition.partition)
.setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code)
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
b/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
index 1993065..f4e5831 100755
--- a/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
+++ b/core/src/main/scala/kafka/server/metadata/MetadataImage.scala
@@ -86,12 +86,15 @@ case class MetadataImageBuilder(brokerId: Int,
} else {
_partitionsBuilder.build()
}
- val nextBrokers = if (_brokersBuilder == null) {
+ MetadataImage(nextPartitions, _controllerId, brokers())
+ }
+
+ def brokers(): MetadataBrokers = {
+ if (_brokersBuilder == null) {
prevImage.brokers
} else {
_brokersBuilder.build()
}
- MetadataImage(nextPartitions, _controllerId, nextBrokers)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala
new file mode 100644
index 0000000..d3d6471
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.cluster.Partition
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.metadata.{CachedConfigRepository, MetadataBroker,
MetadataBrokers, MetadataImage, MetadataImageBuilder, MetadataPartition,
RaftMetadataCache}
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.metadata.PartitionRecord
+import org.apache.kafka.common.metrics.Metrics
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, never, verify, when}
+import org.slf4j.Logger
+
+import scala.collection.{Set, mutable}
+
+trait LeadershipChangeHandler {
+ def onLeadershipChange(updatedLeaders: Iterable[Partition],
updatedFollowers: Iterable[Partition]): Unit
+}
+
+class RaftReplicaManagerTest {
+ private var alterIsrManager: AlterIsrManager = _
+ private var config: KafkaConfig = _
+ private val configRepository = new CachedConfigRepository()
+ private val metrics = new Metrics
+ private var quotaManager: QuotaManagers = _
+ private val time = new MockTime
+ private var mockDelegate: RaftReplicaChangeDelegate = _
+ private var imageBuilder: MetadataImageBuilder = _
+ private val brokerId0 = 0
+ private val metadataBroker0 = new MetadataBroker(brokerId0, null, Map.empty,
false)
+ private val brokerId1 = 1
+ private val metadataBroker1 = new MetadataBroker(brokerId1, null, Map.empty,
false)
+ private val topicName = "topicName"
+ private val topicId = Uuid.randomUuid()
+ private val partitionId0 = 0
+ private val partitionId1 = 1
+ private val topicPartition0 = new TopicPartition(topicName, partitionId0)
+ private val topicPartition1 = new TopicPartition(topicName, partitionId1)
+ private val topicPartitionRecord0 = new PartitionRecord()
+ .setPartitionId(partitionId0)
+ .setTopicId(topicId)
+ .setReplicas(util.Arrays.asList(brokerId0, brokerId1))
+ .setLeader(brokerId0)
+ .setLeaderEpoch(0)
+ private val topicPartitionRecord1 = new PartitionRecord()
+ .setPartitionId(partitionId1)
+ .setTopicId(topicId)
+ .setReplicas(util.Arrays.asList(brokerId0, brokerId1))
+ .setLeader(brokerId1)
+ .setLeaderEpoch(0)
+ private val offset1 = 1L
+ private val metadataPartition0 = MetadataPartition(topicName,
topicPartitionRecord0)
+ private val metadataPartition1 = MetadataPartition(topicName,
topicPartitionRecord1)
+ private var onLeadershipChangeHandler: LeadershipChangeHandler = _
+ private var onLeadershipChange: (Iterable[Partition], Iterable[Partition])
=> Unit = _
+ private var metadataCache: RaftMetadataCache = _
+
+ @BeforeEach
+ def setUp(): Unit = {
+ alterIsrManager = mock(classOf[AlterIsrManager])
+ config = KafkaConfig.fromProps({
+ val nodeId = brokerId0
+ val props = TestUtils.createBrokerConfig(nodeId, "")
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ props
+ })
+ metadataCache = new RaftMetadataCache(config.brokerId)
+ quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+ mockDelegate = mock(classOf[RaftReplicaChangeDelegate])
+ imageBuilder = MetadataImageBuilder(brokerId0, mock(classOf[Logger]), new
MetadataImage())
+ onLeadershipChangeHandler = mock(classOf[LeadershipChangeHandler])
+ onLeadershipChange = onLeadershipChangeHandler.onLeadershipChange _
+ }
+
+ @AfterEach
+ def tearDown(): Unit = {
+ TestUtils.clearYammerMetrics()
+ Option(quotaManager).foreach(_.shutdown())
+ metrics.close()
+ }
+
+ def createRaftReplicaManager(): RaftReplicaManager = {
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new
File(_)))
+ new RaftReplicaManager(config, metrics, time, new MockScheduler(time),
mockLogMgr,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
+ metadataCache, new LogDirFailureChannel(config.logDirs.size),
alterIsrManager,
+ configRepository, None)
+ }
+
+ @Test
+ def testRejectsZkConfig(): Unit = {
+ assertThrows(classOf[IllegalStateException], () => {
+ val zkConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, ""))
+ val mockLogMgr = TestUtils.createLogManager(zkConfig.logDirs.map(new
File(_)))
+ new RaftReplicaManager(zkConfig, metrics, time, new MockScheduler(time),
mockLogMgr,
+ new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
+ metadataCache, new LogDirFailureChannel(config.logDirs.size),
alterIsrManager,
+ configRepository)
+ })
+ }
+
+ @Test
+ def testDefersChangesImmediatelyThenAppliesChanges(): Unit = {
+ val rrm = createRaftReplicaManager()
+ rrm.delegate = mockDelegate
+ val partition0 = Partition(topicPartition0, time, configRepository, rrm)
+ val partition1 = Partition(topicPartition1, time, configRepository, rrm)
+
+ processTopicPartitionMetadata(rrm)
+ // verify changes would have been deferred
+ val partitionsNewMapCaptor: ArgumentCaptor[mutable.Map[Partition,
Boolean]] =
+ ArgumentCaptor.forClass(classOf[mutable.Map[Partition, Boolean]])
+ verify(mockDelegate).makeDeferred(partitionsNewMapCaptor.capture(),
ArgumentMatchers.eq(offset1))
+ val partitionsDeferredMap = partitionsNewMapCaptor.getValue
+ assertEquals(Map(partition0 -> true, partition1 -> true),
partitionsDeferredMap)
+ verify(mockDelegate, never()).makeFollowers(any(), any(), any(), any(),
any())
+
+ // now mark those topic partitions as being deferred so we can later apply
the changes
+ rrm.markPartitionDeferred(partition0, isNew = true)
+ rrm.markPartitionDeferred(partition1, isNew = true)
+
+ // apply the changes
+ // define some return values to avoid NPE
+ when(mockDelegate.makeLeaders(any(), any(), any(),
any())).thenReturn(Set(partition0))
+ when(mockDelegate.makeFollowers(any(), any(), any(), any(),
any())).thenReturn(Set(partition1))
+ rrm.endMetadataChangeDeferral(onLeadershipChange)
+ // verify that the deferred changes would have been applied
+
+ // leaders...
+ val leaderPartitionStates = verifyMakeLeaders(mutable.Set(), None)
+ assertEquals(Map(partition0 -> metadataPartition0), leaderPartitionStates)
+
+ // followers...
+ val followerPartitionStates = verifyMakeFollowers(mutable.Set(),
Set(brokerId0, brokerId1), None)
+ assertEquals(Map(partition1 -> metadataPartition1),
followerPartitionStates)
+
+ // leadership change callbacks
+ verifyLeadershipChangeCallbacks(List(partition0), List(partition1))
+ }
+
+ @Test
+ def testAppliesChangesWhenNotDeferring(): Unit = {
+ val rrm = createRaftReplicaManager()
+ rrm.delegate = mockDelegate
+ val partition0 = Partition(topicPartition0, time, configRepository, rrm)
+ val partition1 = Partition(topicPartition1, time, configRepository, rrm)
+ rrm.endMetadataChangeDeferral(onLeadershipChange)
+
+ // define some return values to avoid NPE
+ when(mockDelegate.makeLeaders(any(), any(), any(),
ArgumentMatchers.eq(Some(offset1)))).thenReturn(Set(partition0))
+ when(mockDelegate.makeFollowers(any(), any(), any(), any(),
ArgumentMatchers.eq(Some(offset1)))).thenReturn(Set(partition1))
+ // process the changes
+ processTopicPartitionMetadata(rrm)
+ // verify that the changes would have been applied
+
+ // leaders...
+ val leaderPartitionStates = verifyMakeLeaders(mutable.Set(), Some(offset1))
+ assertEquals(Map(partition0 -> metadataPartition0), leaderPartitionStates)
+
+ // followers...
+ val followerPartitionStates = verifyMakeFollowers(mutable.Set(),
Set(brokerId0, brokerId1), Some(offset1))
+ assertEquals(Map(partition1 -> metadataPartition1),
followerPartitionStates)
+
+ // leadership change callbacks
+ verifyLeadershipChangeCallbacks(List(partition0), List(partition1))
+ }
+
+ private def verifyMakeLeaders(expectedPrevPartitionsAlreadyExisting:
Set[MetadataPartition],
+ expectedMetadataOffset: Option[Long]):
mutable.Map[Partition, MetadataPartition] = {
+ val leaderPartitionStatesCaptor: ArgumentCaptor[mutable.Map[Partition,
MetadataPartition]] =
+ ArgumentCaptor.forClass(classOf[mutable.Map[Partition,
MetadataPartition]])
+
verify(mockDelegate).makeLeaders(ArgumentMatchers.eq(expectedPrevPartitionsAlreadyExisting),
+ leaderPartitionStatesCaptor.capture(), any(),
ArgumentMatchers.eq(expectedMetadataOffset))
+ leaderPartitionStatesCaptor.getValue
+ }
+
+ private def verifyMakeFollowers(expectedPrevPartitionsAlreadyExisting:
Set[MetadataPartition],
+ expectedBrokers: Set[Int],
+ expectedMetadataOffset: Option[Long]):
mutable.Map[Partition, MetadataPartition] = {
+ val followerPartitionStatesCaptor: ArgumentCaptor[mutable.Map[Partition,
MetadataPartition]] =
+ ArgumentCaptor.forClass(classOf[mutable.Map[Partition,
MetadataPartition]])
+ val brokersCaptor: ArgumentCaptor[MetadataBrokers] =
ArgumentCaptor.forClass(classOf[MetadataBrokers])
+
verify(mockDelegate).makeFollowers(ArgumentMatchers.eq(expectedPrevPartitionsAlreadyExisting),
brokersCaptor.capture(),
+ followerPartitionStatesCaptor.capture(), any(),
ArgumentMatchers.eq(expectedMetadataOffset))
+ val brokers = brokersCaptor.getValue
+ assertEquals(expectedBrokers.size, brokers.size())
+ expectedBrokers.foreach(brokerId =>
assertTrue(brokers.aliveBroker(brokerId).isDefined))
+ followerPartitionStatesCaptor.getValue
+ }
+
+ private def verifyLeadershipChangeCallbacks(expectedUpdatedLeaders:
List[Partition], expectedUpdatedFollowers: List[Partition]): Unit = {
+ val updatedLeadersCaptor: ArgumentCaptor[Iterable[Partition]] =
ArgumentCaptor.forClass(classOf[Iterable[Partition]])
+ val updatedFollowersCaptor: ArgumentCaptor[Iterable[Partition]] =
ArgumentCaptor.forClass(classOf[Iterable[Partition]])
+
verify(onLeadershipChangeHandler).onLeadershipChange(updatedLeadersCaptor.capture(),
updatedFollowersCaptor.capture())
+ assertEquals(expectedUpdatedLeaders, updatedLeadersCaptor.getValue.toList)
+ assertEquals(expectedUpdatedFollowers,
updatedFollowersCaptor.getValue.toList)
+ }
+
+ private def processTopicPartitionMetadata(raftReplicaManager:
RaftReplicaManager): Unit = {
+ // create brokers
+ imageBuilder.brokersBuilder().add(metadataBroker0)
+ imageBuilder.brokersBuilder().add(metadataBroker1)
+ // create topic
+ imageBuilder.partitionsBuilder().addUuidMapping(topicName, topicId)
+ // create deferred partitions
+ imageBuilder.partitionsBuilder().set(metadataPartition0)
+ imageBuilder.partitionsBuilder().set(metadataPartition1)
+ // apply the changes to metadata cache
+ metadataCache.image(imageBuilder.build())
+ // apply the changes to replica manager
+ raftReplicaManager.handleMetadataRecords(imageBuilder, offset1,
onLeadershipChange)
+ }
+}