This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 2a4ee610037 KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically (#15335) 2a4ee610037 is described below commit 2a4ee610037aae2faac35be65efe0a4043c360b2 Author: Omnia Ibrahim <o.g.h.ibra...@gmail.com> AuthorDate: Sat Apr 6 07:36:26 2024 +0100 KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically (#15335) This pr fixes the bug created by #15263 which caused topic partition to be recreated whenever the original log dir is offline: Log directory failure re-creates partitions in another logdir automatically Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>, Igor Soarez <soa...@apple.com>, Gaurav Narula <gaurav_naru...@apple.com>, Proven Provenzano <pprovenz...@confluent.io> --- core/src/main/scala/kafka/cluster/Partition.scala | 26 ++++-- core/src/main/scala/kafka/log/LogManager.scala | 94 +++++++++++----------- .../main/scala/kafka/server/ReplicaManager.scala | 4 +- .../server/metadata/BrokerMetadataPublisher.scala | 11 +-- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 11 +-- .../test/scala/unit/kafka/log/LogManagerTest.scala | 61 +++++++++----- .../unit/kafka/server/ReplicaManagerTest.scala | 28 +++++++ .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +- .../partition/PartitionMakeFollowerBenchmark.java | 2 +- .../UpdateFollowerFetchStateBenchmark.java | 2 +- 10 files changed, 150 insertions(+), 91 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 02e71f7445c..01946160a5f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition, object Partition { private val metricsGroup = new KafkaMetricsGroup(classOf[Partition]) - def apply(topicPartition: TopicPartition, + def apply(topicIdPartition: TopicIdPartition, time: Time, replicaManager: ReplicaManager): Partition = { + Partition( + topicPartition = topicIdPartition.topicPartition(), + topicId = Option(topicIdPartition.topicId()), + time = time, + replicaManager = replicaManager) + } + def apply(topicPartition: TopicPartition, + time: Time, + replicaManager: ReplicaManager, + topicId: Option[Uuid] = None): Partition = { val isrChangeListener = new AlterPartitionListener { override def markIsrExpand(): Unit = { @@ -127,6 +137,7 @@ object Partition { replicaManager.delayedDeleteRecordsPurgatory) new Partition(topicPartition, + _topicId = topicId, replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs, interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, localBrokerId = replicaManager.config.brokerId, @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, - alterIsrManager: AlterPartitionManager) extends Logging { + alterIsrManager: AlterPartitionManager, + @volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code by KAFKA-16212 + ) extends Logging { import Partition.metricsGroup - def topic: String = topicPartition.topic def partitionId: Int = topicPartition.partition @@ -570,11 +582,15 @@ class Partition(val topicPartition: TopicPartition, } /** + * Return either the value of _topicId if it is provided or return the topic id attached to the log itself. + * If _topicId is empty then the method will fetch topicId from the log and update _topicId. * @return the topic ID for the log or None if the log or the topic ID does not exist. */ def topicId: Option[Uuid] = { - val log = this.log.orElse(logManager.getLog(topicPartition)) - log.flatMap(_.topicId) + if (_topicId.isEmpty || _topicId.contains(Uuid.ZERO_UUID)) { + _topicId = this.log.orElse(logManager.getLog(topicPartition)).flatMap(_.topicId) + } + _topicId } // remoteReplicas will be called in the hot path, and must be inexpensive diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 6dd8a9d2a67..1d35aa59db1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -325,7 +325,8 @@ class LogManager(logDirs: Seq[File], logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig], - numRemainingSegments: ConcurrentMap[String, Int]): UnifiedLog = { + numRemainingSegments: ConcurrentMap[String, Int], + isStray: UnifiedLog => Boolean): UnifiedLog = { val topicPartition = UnifiedLog.parseTopicPartitionName(logDir) val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig) val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) @@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File], } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) { addStrayLog(topicPartition, log) warn(s"Loaded stray log: $logDir") + } else if (isStray(log)) { + // Unlike Zookeeper mode, which tracks pending topic deletions under a ZNode, KRaft is unable to prevent a topic from being recreated before every replica has been deleted. + // A KRaft broker with an offline directory may be unable to detect it still holds a to-be-deleted replica, + // and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories. + // So upon a restart in which the offline directory is back online we need to clean up the old replica directory. + log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false) + addStrayLog(log.topicPartition, log) + warn(s"Log in ${logDir.getAbsolutePath} marked stray and renamed to ${log.dir.getAbsolutePath}") } else { val previous = { if (log.isFuture) @@ -399,7 +408,7 @@ class LogManager(logDirs: Seq[File], /** * Recover and load all logs in the given data directories */ - private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = { + private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig], isStray: UnifiedLog => Boolean): Unit = { info(s"Loading logs from log dirs $liveLogDirs") val startMs = time.hiResClockMs() val threadPools = ArrayBuffer.empty[ExecutorService] @@ -480,7 +489,7 @@ class LogManager(logDirs: Seq[File], val logLoadStartMs = time.hiResClockMs() try { log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, - defaultConfig, topicConfigOverrides, numRemainingSegments)) + defaultConfig, topicConfigOverrides, numRemainingSegments, isStray)) } catch { case e: IOException => handleIOException(logDirAbsolutePath, e) @@ -564,20 +573,10 @@ class LogManager(logDirs: Seq[File], /** * Start the background threads to flush logs and do log cleanup */ - def startup(topicNames: Set[String]): Unit = { + def startup(topicNames: Set[String], isStray: UnifiedLog => Boolean = _ => false): Unit = { // ensure consistency between default config and overrides val defaultConfig = currentDefaultConfig - startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames)) - } - - def deleteStrayKRaftReplicas( - brokerId: Int, - image: TopicsImage - ): Unit = { - val strayPartitions = findStrayReplicas(brokerId, image, allLogs) - strayPartitions.foreach(topicPartition => { - asyncDelete(topicPartition, false, false, true) - }) + startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames), isStray) } // visible for testing @@ -616,8 +615,11 @@ class LogManager(logDirs: Seq[File], } // visible for testing - private[log] def startupWithConfigOverrides(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = { - loadLogs(defaultConfig, topicConfigOverrides) // this could take a while if shutdown was not clean + private[log] def startupWithConfigOverrides( + defaultConfig: LogConfig, + topicConfigOverrides: Map[String, LogConfig], + isStray: UnifiedLog => Boolean): Unit = { + loadLogs(defaultConfig, topicConfigOverrides, isStray) // this could take a while if shutdown was not clean /* Schedule the cleanup task to delete old logs */ if (scheduler != null) { @@ -1540,40 +1542,38 @@ object LogManager { } /** - * Find logs which should not be on the current broker, according to the metadata image. - * - * @param brokerId The ID of the current broker. - * @param newTopicsImage The new topics image after broker has been reloaded - * @param logs A collection of Log objects. + * Returns true if the given log should not be on the current broker + * according to the metadata image. * - * @return The topic partitions which are no longer needed on this broker. + * @param brokerId The ID of the current broker. + * @param newTopicsImage The new topics image after broker has been reloaded + * @param log The log object to check + * @return true if the log should not exist on the broker, false otherwise. */ - def findStrayReplicas( - brokerId: Int, - newTopicsImage: TopicsImage, - logs: Iterable[UnifiedLog] - ): Iterable[TopicPartition] = { - logs.flatMap { log => - val topicId = log.topicId.getOrElse { - throw new RuntimeException(s"The log dir $log does not have a topic ID, " + - "which is not allowed when running in KRaft mode.") - } + def isStrayKraftReplica( + brokerId: Int, + newTopicsImage: TopicsImage, + log: UnifiedLog + ): Boolean = { + val topicId = log.topicId.getOrElse { + throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } - val partitionId = log.topicPartition.partition() - Option(newTopicsImage.getPartition(topicId, partitionId)) match { - case Some(partition) => - if (!partition.replicas.contains(brokerId)) { - info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + - s"does not contain the local brokerId $brokerId.") - Some(log.topicPartition) - } else { - None - } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) match { + case Some(partition) => + if (!partition.replicas.contains(brokerId)) { + info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " + + s"does not contain the local brokerId $brokerId.") + true + } else { + false + } - case None => - info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image") - Some(log.topicPartition) - } + case None => + info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image") + true } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a8f6cc52d67..e9b386992b1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2785,7 +2785,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Creating new partition $tp with topic id " + s"$topicId." + s"A topic with the same name but different id exists but it resides in an offline log " + s"directory.") - val partition = Partition(tp, time, this) + val partition = Partition(new TopicIdPartition(topicId, tp), time, this) allPartitions.put(tp, HostedPartition.Online(partition)) Some(partition, true) } @@ -2808,7 +2808,7 @@ class ReplicaManager(val config: KafkaConfig, s"$topicId.") } // it's a partition that we don't know about yet, so create it and mark it online - val partition = Partition(tp, time, this) + val partition = Partition(new TopicIdPartition(topicId, tp), time, this) allPartitions.put(tp, HostedPartition.Online(partition)) Some(partition, true) } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index e49b910d246..070f667db23 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -289,13 +289,10 @@ class BrokerMetadataPublisher( try { // Start log manager, which will perform (potentially lengthy) // recovery-from-unclean-shutdown if required. - logManager.startup(metadataCache.getAllTopics()) - - // Delete partition directories which we're not supposed to have. We have - // to do this before starting ReplicaManager, so that the stray replicas - // don't block creation of new ones with different IDs but the same names. - // See KAFKA-14616 for details. - logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics()) + logManager.startup( + metadataCache.getAllTopics(), + isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) + ) // Make the LogCleaner available for reconfiguration. We can't do this prior to this // point because LogManager#startup creates the LogCleaner object, if diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 5232f3b10b5..2248aa5fd66 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -130,7 +130,8 @@ class LogLoaderTest { override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig, - topicConfigs: Map[String, LogConfig], numRemainingSegments: ConcurrentMap[String, Int]): UnifiedLog = { + topicConfigs: Map[String, LogConfig], numRemainingSegments: ConcurrentMap[String, Int], + shouldBeStrayKraftLog: UnifiedLog => Boolean): UnifiedLog = { if (simulateError.hasError) { simulateError.errorType match { case ErrorTypes.KafkaStorageExceptionWithIOExceptionCause => @@ -176,7 +177,7 @@ class LogLoaderTest { val runLoadLogs: Executable = () => { val defaultConfig = logManager.currentDefaultConfig - logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false) } (logManager, runLoadLogs) @@ -190,13 +191,13 @@ class LogLoaderTest { cleanShutdownFileHandler.write(0L) cleanShutdownInterceptedValue = false var defaultConfig = logManager.currentDefaultConfig - logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false) assertTrue(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag") assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed") // Load logs without clean shutdown file cleanShutdownInterceptedValue = true defaultConfig = logManager.currentDefaultConfig - logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false) assertFalse(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag") assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must not exist after loadLogs has completed") // Create clean shutdown file and then simulate error while loading logs such that log loading does not complete. @@ -233,7 +234,7 @@ class LogLoaderTest { simulateError.hasError = false cleanShutdownInterceptedValue = true val defaultConfig = logManager.currentDefaultConfig - logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) + logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false) assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag") logManager.shutdown() } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index d4176fe1fdd..20a37e9be54 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -257,7 +257,7 @@ class LogManagerTest { invocation.callRealMethod().asInstanceOf[UnifiedLog] loadLogCalled = loadLogCalled + 1 }.when(logManager).loadLog(any[File], any[Boolean], any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]], - any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]]) + any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]], any[UnifiedLog => Boolean]()) val t = new Thread() { override def run(): Unit = { logManager.startup(Set.empty) } @@ -515,7 +515,29 @@ class LogManagerTest { val remoteIndexCache = new File(logDir, RemoteIndexCache.DIR_NAME) remoteIndexCache.mkdir() logManager = createLogManager(Seq(logDir)) - logManager.loadLogs(logConfig, Map.empty) + logManager.loadLogs(logConfig, Map.empty, _ => false) + } + + @Test + def testLoadLogRenameLogThatShouldBeStray(): Unit = { + var invokedCount = 0 + val logDir = TestUtils.tempDir() + logManager = createLogManager(Seq(logDir)) + + val testTopic = "test-stray-topic" + val testTopicPartition = new TopicPartition(testTopic, 0) + val log = logManager.getOrCreateLog(testTopicPartition, topicId = Some(Uuid.randomUuid())) + def providedIsStray(log: UnifiedLog) = { + invokedCount += 1 + true + } + + logManager.loadLog(log.dir, true, Map.empty, Map.empty, logConfig, Map.empty, new ConcurrentHashMap[String, Int](), providedIsStray) + assertEquals(1, invokedCount) + assertTrue( + logDir.listFiles().toSet + .exists(f => f.getName.startsWith(testTopic) && f.getName.endsWith(UnifiedLog.StrayDirSuffix)) + ) } /** @@ -948,7 +970,7 @@ class LogManagerTest { numRemainingSegments = mockMap) }.when(spyLogManager).loadLog(any[File], any[Boolean], any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]], - any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]]) + any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]], any[UnifiedLog => Boolean]()) // do nothing for removeLogRecoveryMetrics for metrics verification doNothing().when(spyLogManager).removeLogRecoveryMetrics() @@ -1139,18 +1161,15 @@ class LogManagerTest { val recreatedFoo1 = new TopicIdPartition(Uuid.fromString("_dOOzPe3TfiWV21Lh7Vmqg"), new TopicPartition("foo", 1)) @Test - def testFindStrayReplicasInEmptyImage(): Unit = { - val image: TopicsImage = topicsImage(Seq()) - val onDisk = Seq(foo0, foo1, bar0, bar1, quux0) - val expected = onDisk.map(_.topicPartition()).toSet - assertEquals(expected, - LogManager.findStrayReplicas(0, - image, onDisk.map(mockLog(_)).toSet)) + def testIsStrayKraftReplicaWithEmptyImage(): Unit = { + val image: TopicsImage = topicsImage(Seq()) + val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog(_)) + assertTrue(onDisk.forall(log => LogManager.isStrayKraftReplica(0, image, log))) } @Test - def testFindSomeStrayReplicasInImage(): Unit = { - val image: TopicsImage = topicsImage(Seq( + def testIsStrayKraftReplicaInImage(): Unit = { + val image: TopicsImage = topicsImage(Seq( topicImage(Map( foo0 -> Seq(0, 1, 2), )), @@ -1160,15 +1179,14 @@ class LogManagerTest { )) )) val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog(_)) - val expected = Set(foo1, quux0).map(_.topicPartition) - assertEquals(expected, - LogManager.findStrayReplicas(0, - image, onDisk).toSet) + val expectedStrays = Set(foo1, quux0).map(_.topicPartition()) + + onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log))) } @Test - def testFindSomeStrayReplicasInImageWithRemoteReplicas(): Unit = { - val image: TopicsImage = topicsImage(Seq( + def testIsStrayKraftReplicaInImageWithRemoteReplicas(): Unit = { + val image: TopicsImage = topicsImage(Seq( topicImage(Map( foo0 -> Seq(0, 1, 2), )), @@ -1178,10 +1196,9 @@ class LogManagerTest { )) )) val onDisk = Seq(foo0, bar0, bar1).map(mockLog(_)) - val expected = Set(bar0).map(_.topicPartition) - assertEquals(expected, - LogManager.findStrayReplicas(0, - image, onDisk).toSet) + val expectedStrays = Set(bar0).map(_.topicPartition) + + onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log))) } @Test diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0ca8e8127fa..2f67e9c8edb 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4942,6 +4942,34 @@ class ReplicaManagerTest { } } + @Test + def testGetOrCreatePartitionShouldNotCreateOfflinePartition(): Unit = { + val localId = 1 + val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, "foo") + val directoryEventHandler = mock(classOf[DirectoryEventHandler]) + + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, setupLogDirMetaProperties = true, directoryEventHandler = directoryEventHandler) + try { + val directoryIds = replicaManager.logManager.directoryIdsSet.toList + assertEquals(directoryIds.size, 2) + val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = directoryIds) + val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get + partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), + None) + + assertTrue(isNewWhenCreatedForFirstTime) + // mark topic partition as offline + replicaManager.markPartitionOffline(topicPartition0.topicPartition()) + + // recreate the partition again shouldn't create new partition + val recreateResults = replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID) + assertTrue(recreateResults.isEmpty) + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + private def verifyRLMOnLeadershipChange(leaderPartitions: util.Set[Partition], followerPartitions: util.Set[Partition]): Unit = { val leaderCapture: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) val followerCapture: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index b18bfb14979..d97ae72d770 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -180,7 +180,7 @@ public class ReplicaFetcherThreadBenchmark { AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class); Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp), - Mockito.mock(MetadataCache.class), logManager, isrChannelManager); + Mockito.mock(MetadataCache.class), logManager, isrChannelManager, topicId); partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty()); pool.put(tp, partition); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 77585f161b1..65ffa821c46 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -124,7 +124,7 @@ public class PartitionMakeFollowerBenchmark { partition = new Partition(tp, 100, MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, - Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); + Mockito.mock(MetadataCache.class), logManager, alterPartitionManager, topicId); partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId, Option.empty()); executorService.submit((Runnable) () -> { SimpleRecord[] simpleRecords = new SimpleRecord[] { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 010602c9f0c..71ef4c5bb01 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -127,7 +127,7 @@ public class UpdateFollowerFetchStateBenchmark { partition = new Partition(topicPartition, 100, MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM, alterPartitionListener, delayedOperations, - Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); + Mockito.mock(MetadataCache.class), logManager, alterPartitionManager, topicId); partition.makeLeader(partitionState, offsetCheckpoints, topicId, Option.empty()); replica1 = partition.getReplica(1).get(); replica2 = partition.getReplica(2).get();