This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 2a4ee610037 KAFKA-16234: Log directory failure re-creates partitions 
in another logdir automatically (#15335)
2a4ee610037 is described below

commit 2a4ee610037aae2faac35be65efe0a4043c360b2
Author: Omnia Ibrahim <o.g.h.ibra...@gmail.com>
AuthorDate: Sat Apr 6 07:36:26 2024 +0100

    KAFKA-16234: Log directory failure re-creates partitions in another logdir 
automatically (#15335)
    
    This pr fixes the bug created by #15263 which caused topic partition to be 
recreated whenever the original log dir is offline: Log directory failure 
re-creates partitions in another logdir automatically
    
    Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>, Igor Soarez <soa...@apple.com>, Gaurav Narula 
<gaurav_naru...@apple.com>, Proven Provenzano <pprovenz...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 26 ++++--
 core/src/main/scala/kafka/log/LogManager.scala     | 94 +++++++++++-----------
 .../main/scala/kafka/server/ReplicaManager.scala   |  4 +-
 .../server/metadata/BrokerMetadataPublisher.scala  | 11 +--
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 11 +--
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 61 +++++++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 28 +++++++
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  2 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |  2 +-
 .../UpdateFollowerFetchStateBenchmark.java         |  2 +-
 10 files changed, 150 insertions(+), 91 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 02e71f7445c..01946160a5f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition,
 object Partition {
   private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
 
-  def apply(topicPartition: TopicPartition,
+  def apply(topicIdPartition: TopicIdPartition,
             time: Time,
             replicaManager: ReplicaManager): Partition = {
+    Partition(
+      topicPartition = topicIdPartition.topicPartition(),
+      topicId = Option(topicIdPartition.topicId()),
+      time = time,
+      replicaManager = replicaManager)
+  }
+  def apply(topicPartition: TopicPartition,
+            time: Time,
+            replicaManager: ReplicaManager,
+            topicId: Option[Uuid] = None): Partition = {
 
     val isrChangeListener = new AlterPartitionListener {
       override def markIsrExpand(): Unit = {
@@ -127,6 +137,7 @@ object Partition {
       replicaManager.delayedDeleteRecordsPurgatory)
 
     new Partition(topicPartition,
+      _topicId = topicId,
       replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
       interBrokerProtocolVersion = 
replicaManager.config.interBrokerProtocolVersion,
       localBrokerId = replicaManager.config.brokerId,
@@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition,
                 delayedOperations: DelayedOperations,
                 metadataCache: MetadataCache,
                 logManager: LogManager,
-                alterIsrManager: AlterPartitionManager) extends Logging {
+                alterIsrManager: AlterPartitionManager,
+                @volatile private var _topicId: Option[Uuid] = None // TODO: 
merge topicPartition and _topicId into TopicIdPartition once TopicId persist in 
most of the code by KAFKA-16212
+               ) extends Logging {
 
   import Partition.metricsGroup
-
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
 
@@ -570,11 +582,15 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
+   * Return either the value of _topicId if it is provided or return the topic 
id attached to the log itself.
+   * If _topicId is empty then the method will fetch topicId from the log and 
update _topicId.
    * @return the topic ID for the log or None if the log or the topic ID does 
not exist.
    */
   def topicId: Option[Uuid] = {
-    val log = this.log.orElse(logManager.getLog(topicPartition))
-    log.flatMap(_.topicId)
+    if (_topicId.isEmpty || _topicId.contains(Uuid.ZERO_UUID)) {
+      _topicId = 
this.log.orElse(logManager.getLog(topicPartition)).flatMap(_.topicId)
+    }
+    _topicId
   }
 
   // remoteReplicas will be called in the hot path, and must be inexpensive
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 6dd8a9d2a67..1d35aa59db1 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -325,7 +325,8 @@ class LogManager(logDirs: Seq[File],
                            logStartOffsets: Map[TopicPartition, Long],
                            defaultConfig: LogConfig,
                            topicConfigOverrides: Map[String, LogConfig],
-                           numRemainingSegments: ConcurrentMap[String, Int]): 
UnifiedLog = {
+                           numRemainingSegments: ConcurrentMap[String, Int],
+                           isStray: UnifiedLog => Boolean): UnifiedLog = {
     val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val config = topicConfigOverrides.getOrElse(topicPartition.topic, 
defaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
@@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File],
     } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
       addStrayLog(topicPartition, log)
       warn(s"Loaded stray log: $logDir")
+    } else if (isStray(log)) {
+      // Unlike Zookeeper mode, which tracks pending topic deletions under a 
ZNode, KRaft is unable to prevent a topic from being recreated before every 
replica has been deleted.
+      // A KRaft broker with an offline directory may be unable to detect it 
still holds a to-be-deleted replica,
+      // and can create a conflicting topic partition for a new incarnation of 
the topic in one of the remaining online directories.
+      // So upon a restart in which the offline directory is back online we 
need to clean up the old replica directory.
+      log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), 
shouldReinitialize = false)
+      addStrayLog(log.topicPartition, log)
+      warn(s"Log in ${logDir.getAbsolutePath} marked stray and renamed to 
${log.dir.getAbsolutePath}")
     } else {
       val previous = {
         if (log.isFuture)
@@ -399,7 +408,7 @@ class LogManager(logDirs: Seq[File],
   /**
    * Recover and load all logs in the given data directories
    */
-  private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: 
Map[String, LogConfig]): Unit = {
+  private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: 
Map[String, LogConfig], isStray: UnifiedLog => Boolean): Unit = {
     info(s"Loading logs from log dirs $liveLogDirs")
     val startMs = time.hiResClockMs()
     val threadPools = ArrayBuffer.empty[ExecutorService]
@@ -480,7 +489,7 @@ class LogManager(logDirs: Seq[File],
             val logLoadStartMs = time.hiResClockMs()
             try {
               log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-                defaultConfig, topicConfigOverrides, numRemainingSegments))
+                defaultConfig, topicConfigOverrides, numRemainingSegments, 
isStray))
             } catch {
               case e: IOException =>
                 handleIOException(logDirAbsolutePath, e)
@@ -564,20 +573,10 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(topicNames: Set[String]): Unit = {
+  def startup(topicNames: Set[String], isStray: UnifiedLog => Boolean = _ => 
false): Unit = {
     // ensure consistency between default config and overrides
     val defaultConfig = currentDefaultConfig
-    startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames))
-  }
-
-  def deleteStrayKRaftReplicas(
-    brokerId: Int,
-    image: TopicsImage
-  ): Unit = {
-    val strayPartitions = findStrayReplicas(brokerId, image, allLogs)
-    strayPartitions.foreach(topicPartition => {
-      asyncDelete(topicPartition, false, false, true)
-    })
+    startupWithConfigOverrides(defaultConfig, 
fetchTopicConfigOverrides(defaultConfig, topicNames), isStray)
   }
 
   // visible for testing
@@ -616,8 +615,11 @@ class LogManager(logDirs: Seq[File],
   }
 
   // visible for testing
-  private[log] def startupWithConfigOverrides(defaultConfig: LogConfig, 
topicConfigOverrides: Map[String, LogConfig]): Unit = {
-    loadLogs(defaultConfig, topicConfigOverrides) // this could take a while 
if shutdown was not clean
+  private[log] def startupWithConfigOverrides(
+    defaultConfig: LogConfig,
+    topicConfigOverrides: Map[String, LogConfig],
+    isStray: UnifiedLog => Boolean): Unit = {
+    loadLogs(defaultConfig, topicConfigOverrides, isStray) // this could take 
a while if shutdown was not clean
 
     /* Schedule the cleanup task to delete old logs */
     if (scheduler != null) {
@@ -1540,40 +1542,38 @@ object LogManager {
   }
 
   /**
-   * Find logs which should not be on the current broker, according to the 
metadata image.
-   *
-   * @param brokerId        The ID of the current broker.
-   * @param newTopicsImage  The new topics image after broker has been reloaded
-   * @param logs            A collection of Log objects.
+   * Returns true if the given log should not be on the current broker
+   * according to the metadata image.
    *
-   * @return          The topic partitions which are no longer needed on this 
broker.
+   * @param brokerId       The ID of the current broker.
+   * @param newTopicsImage The new topics image after broker has been reloaded
+   * @param log            The log object to check
+   * @return true if the log should not exist on the broker, false otherwise.
    */
-  def findStrayReplicas(
-    brokerId: Int,
-    newTopicsImage: TopicsImage,
-    logs: Iterable[UnifiedLog]
-  ): Iterable[TopicPartition] = {
-    logs.flatMap { log =>
-      val topicId = log.topicId.getOrElse {
-        throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
-          "which is not allowed when running in KRaft mode.")
-      }
+  def isStrayKraftReplica(
+   brokerId: Int,
+   newTopicsImage: TopicsImage,
+   log: UnifiedLog
+  ): Boolean = {
+    val topicId = log.topicId.getOrElse {
+      throw new RuntimeException(s"The log dir $log does not have a topic ID, 
" +
+        "which is not allowed when running in KRaft mode.")
+    }
 
-      val partitionId = log.topicPartition.partition()
-      Option(newTopicsImage.getPartition(topicId, partitionId)) match {
-        case Some(partition) =>
-          if (!partition.replicas.contains(brokerId)) {
-            info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
-              s"does not contain the local brokerId $brokerId.")
-            Some(log.topicPartition)
-          } else {
-            None
-          }
+    val partitionId = log.topicPartition.partition()
+    Option(newTopicsImage.getPartition(topicId, partitionId)) match {
+      case Some(partition) =>
+        if (!partition.replicas.contains(brokerId)) {
+          info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas.mkString("[", ", ", "]")} " +
+            s"does not contain the local brokerId $brokerId.")
+          true
+        } else {
+          false
+        }
 
-        case None =>
-          info(s"Found stray log dir $log: the topicId $topicId does not exist 
in the metadata image")
-          Some(log.topicPartition)
-      }
+      case None =>
+        info(s"Found stray log dir $log: the topicId $topicId does not exist 
in the metadata image")
+        true
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a8f6cc52d67..e9b386992b1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2785,7 +2785,7 @@ class ReplicaManager(val config: KafkaConfig,
           stateChangeLogger.info(s"Creating new partition $tp with topic id " 
+ s"$topicId." +
             s"A topic with the same name but different id exists but it 
resides in an offline log " +
             s"directory.")
-          val partition = Partition(tp, time, this)
+          val partition = Partition(new TopicIdPartition(topicId, tp), time, 
this)
           allPartitions.put(tp, HostedPartition.Online(partition))
           Some(partition, true)
         }
@@ -2808,7 +2808,7 @@ class ReplicaManager(val config: KafkaConfig,
             s"$topicId.")
         }
         // it's a partition that we don't know about yet, so create it and 
mark it online
-        val partition = Partition(tp, time, this)
+        val partition = Partition(new TopicIdPartition(topicId, tp), time, 
this)
         allPartitions.put(tp, HostedPartition.Online(partition))
         Some(partition, true)
     }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e49b910d246..070f667db23 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -289,13 +289,10 @@ class BrokerMetadataPublisher(
     try {
       // Start log manager, which will perform (potentially lengthy)
       // recovery-from-unclean-shutdown if required.
-      logManager.startup(metadataCache.getAllTopics())
-
-      // Delete partition directories which we're not supposed to have. We have
-      // to do this before starting ReplicaManager, so that the stray replicas
-      // don't block creation of new ones with different IDs but the same 
names.
-      // See KAFKA-14616 for details.
-      logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics())
+      logManager.startup(
+        metadataCache.getAllTopics(),
+        isStray = log => LogManager.isStrayKraftReplica(brokerId, 
newImage.topics(), log)
+      )
 
       // Make the LogCleaner available for reconfiguration. We can't do this 
prior to this
       // point because LogManager#startup creates the LogCleaner object, if
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 5232f3b10b5..2248aa5fd66 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -130,7 +130,8 @@ class LogLoaderTest {
 
         override def loadLog(logDir: File, hadCleanShutdown: Boolean, 
recoveryPoints: Map[TopicPartition, Long],
                              logStartOffsets: Map[TopicPartition, Long], 
defaultConfig: LogConfig,
-                             topicConfigs: Map[String, LogConfig], 
numRemainingSegments: ConcurrentMap[String, Int]): UnifiedLog = {
+                             topicConfigs: Map[String, LogConfig], 
numRemainingSegments: ConcurrentMap[String, Int],
+                             shouldBeStrayKraftLog: UnifiedLog => Boolean): 
UnifiedLog = {
           if (simulateError.hasError) {
             simulateError.errorType match {
               case ErrorTypes.KafkaStorageExceptionWithIOExceptionCause =>
@@ -176,7 +177,7 @@ class LogLoaderTest {
 
       val runLoadLogs: Executable = () => {
         val defaultConfig = logManager.currentDefaultConfig
-        logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+        logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false)
       }
 
       (logManager, runLoadLogs)
@@ -190,13 +191,13 @@ class LogLoaderTest {
       cleanShutdownFileHandler.write(0L)
       cleanShutdownInterceptedValue = false
       var defaultConfig = logManager.currentDefaultConfig
-      logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+      logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false)
       assertTrue(cleanShutdownInterceptedValue, "Unexpected value intercepted 
for clean shutdown flag")
       assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must 
not exist after loadLogs has completed")
       // Load logs without clean shutdown file
       cleanShutdownInterceptedValue = true
       defaultConfig = logManager.currentDefaultConfig
-      logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+      logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false)
       assertFalse(cleanShutdownInterceptedValue, "Unexpected value intercepted 
for clean shutdown flag")
       assertFalse(cleanShutdownFileHandler.exists(), "Clean shutdown file must 
not exist after loadLogs has completed")
       // Create clean shutdown file and then simulate error while loading logs 
such that log loading does not complete.
@@ -233,7 +234,7 @@ class LogLoaderTest {
       simulateError.hasError = false
       cleanShutdownInterceptedValue = true
       val defaultConfig = logManager.currentDefaultConfig
-      logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+      logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty), _ => false)
       assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean 
shutdown flag")
       logManager.shutdown()
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index d4176fe1fdd..20a37e9be54 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -257,7 +257,7 @@ class LogManagerTest {
       invocation.callRealMethod().asInstanceOf[UnifiedLog]
       loadLogCalled = loadLogCalled + 1
     }.when(logManager).loadLog(any[File], any[Boolean], 
any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
-      any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, 
Int]])
+      any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, 
Int]], any[UnifiedLog => Boolean]())
 
     val t = new Thread() {
       override def run(): Unit = { logManager.startup(Set.empty) }
@@ -515,7 +515,29 @@ class LogManagerTest {
     val remoteIndexCache = new File(logDir, RemoteIndexCache.DIR_NAME)
     remoteIndexCache.mkdir()
     logManager = createLogManager(Seq(logDir))
-    logManager.loadLogs(logConfig, Map.empty)
+    logManager.loadLogs(logConfig, Map.empty, _ => false)
+  }
+
+  @Test
+  def testLoadLogRenameLogThatShouldBeStray(): Unit = {
+    var invokedCount = 0
+    val logDir = TestUtils.tempDir()
+    logManager = createLogManager(Seq(logDir))
+
+    val testTopic = "test-stray-topic"
+    val testTopicPartition = new TopicPartition(testTopic, 0)
+    val log = logManager.getOrCreateLog(testTopicPartition, topicId = 
Some(Uuid.randomUuid()))
+    def providedIsStray(log: UnifiedLog) = {
+      invokedCount += 1
+      true
+    }
+
+    logManager.loadLog(log.dir, true, Map.empty, Map.empty, logConfig, 
Map.empty, new ConcurrentHashMap[String, Int](),  providedIsStray)
+    assertEquals(1, invokedCount)
+    assertTrue(
+      logDir.listFiles().toSet
+      .exists(f => f.getName.startsWith(testTopic) && 
f.getName.endsWith(UnifiedLog.StrayDirSuffix))
+    )
   }
 
   /**
@@ -948,7 +970,7 @@ class LogManagerTest {
         numRemainingSegments = mockMap)
 
     }.when(spyLogManager).loadLog(any[File], any[Boolean], 
any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
-      any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, 
Int]])
+      any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, 
Int]], any[UnifiedLog => Boolean]())
 
     // do nothing for removeLogRecoveryMetrics for metrics verification
     doNothing().when(spyLogManager).removeLogRecoveryMetrics()
@@ -1139,18 +1161,15 @@ class LogManagerTest {
   val recreatedFoo1 = new 
TopicIdPartition(Uuid.fromString("_dOOzPe3TfiWV21Lh7Vmqg"), new 
TopicPartition("foo", 1))
 
   @Test
-  def testFindStrayReplicasInEmptyImage(): Unit = {
-    val image: TopicsImage  = topicsImage(Seq())
-    val onDisk = Seq(foo0, foo1, bar0, bar1, quux0)
-    val expected = onDisk.map(_.topicPartition()).toSet
-    assertEquals(expected,
-      LogManager.findStrayReplicas(0,
-        image, onDisk.map(mockLog(_)).toSet))
+  def testIsStrayKraftReplicaWithEmptyImage(): Unit = {
+    val image: TopicsImage = topicsImage(Seq())
+    val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog(_))
+    assertTrue(onDisk.forall(log => LogManager.isStrayKraftReplica(0, image, 
log)))
   }
 
   @Test
-  def testFindSomeStrayReplicasInImage(): Unit = {
-    val image: TopicsImage  = topicsImage(Seq(
+  def testIsStrayKraftReplicaInImage(): Unit = {
+    val image: TopicsImage = topicsImage(Seq(
       topicImage(Map(
         foo0 -> Seq(0, 1, 2),
       )),
@@ -1160,15 +1179,14 @@ class LogManagerTest {
       ))
     ))
     val onDisk = Seq(foo0, foo1, bar0, bar1, quux0).map(mockLog(_))
-    val expected = Set(foo1, quux0).map(_.topicPartition)
-    assertEquals(expected,
-      LogManager.findStrayReplicas(0,
-        image, onDisk).toSet)
+    val expectedStrays = Set(foo1, quux0).map(_.topicPartition())
+
+    onDisk.foreach(log => 
assertEquals(expectedStrays.contains(log.topicPartition), 
LogManager.isStrayKraftReplica(0, image, log)))
   }
 
   @Test
-  def testFindSomeStrayReplicasInImageWithRemoteReplicas(): Unit = {
-    val image: TopicsImage  = topicsImage(Seq(
+  def testIsStrayKraftReplicaInImageWithRemoteReplicas(): Unit = {
+    val image: TopicsImage = topicsImage(Seq(
       topicImage(Map(
         foo0 -> Seq(0, 1, 2),
       )),
@@ -1178,10 +1196,9 @@ class LogManagerTest {
       ))
     ))
     val onDisk = Seq(foo0, bar0, bar1).map(mockLog(_))
-    val expected = Set(bar0).map(_.topicPartition)
-    assertEquals(expected,
-      LogManager.findStrayReplicas(0,
-        image, onDisk).toSet)
+    val expectedStrays = Set(bar0).map(_.topicPartition)
+
+    onDisk.foreach(log => 
assertEquals(expectedStrays.contains(log.topicPartition), 
LogManager.isStrayKraftReplica(0, image, log)))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0ca8e8127fa..2f67e9c8edb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4942,6 +4942,34 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testGetOrCreatePartitionShouldNotCreateOfflinePartition(): Unit = {
+    val localId = 1
+    val topicPartition0 = new TopicIdPartition(FOO_UUID, 0, "foo")
+    val directoryEventHandler = mock(classOf[DirectoryEventHandler])
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId, setupLogDirMetaProperties = true, 
directoryEventHandler = directoryEventHandler)
+    try {
+      val directoryIds = replicaManager.logManager.directoryIdsSet.toList
+      assertEquals(directoryIds.size, 2)
+      val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, 
partition = 0, directoryIds = directoryIds)
+      val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = 
replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), 
leaderTopicsDelta, FOO_UUID).get
+      
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
 1, localId, Seq(1, 2)),
+        new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints),
+        None)
+
+      assertTrue(isNewWhenCreatedForFirstTime)
+      // mark topic partition as offline
+      replicaManager.markPartitionOffline(topicPartition0.topicPartition())
+
+      // recreate the partition again shouldn't create new partition
+      val recreateResults = 
replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), 
leaderTopicsDelta, FOO_UUID)
+      assertTrue(recreateResults.isEmpty)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   private def verifyRLMOnLeadershipChange(leaderPartitions: 
util.Set[Partition], followerPartitions: util.Set[Partition]): Unit = {
     val leaderCapture: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
     val followerCapture: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index b18bfb14979..d97ae72d770 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -180,7 +180,7 @@ public class ReplicaFetcherThreadBenchmark {
             AlterPartitionManager isrChannelManager = 
Mockito.mock(AlterPartitionManager.class);
             Partition partition = new Partition(tp, 100, 
MetadataVersion.latestTesting(),
                     0, () -> -1, Time.SYSTEM, alterPartitionListener, new 
DelayedOperationsMock(tp),
-                    Mockito.mock(MetadataCache.class), logManager, 
isrChannelManager);
+                    Mockito.mock(MetadataCache.class), logManager, 
isrChannelManager, topicId);
 
             partition.makeFollower(partitionState, offsetCheckpoints, topicId, 
Option.empty());
             pool.put(tp, partition);
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 77585f161b1..65ffa821c46 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -124,7 +124,7 @@ public class PartitionMakeFollowerBenchmark {
         partition = new Partition(tp, 100,
             MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM,
             alterPartitionListener, delayedOperations,
-            Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager);
+            Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager, topicId);
         partition.createLogIfNotExists(true, false, offsetCheckpoints, 
topicId, Option.empty());
         executorService.submit((Runnable) () -> {
             SimpleRecord[] simpleRecords = new SimpleRecord[] {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 010602c9f0c..71ef4c5bb01 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -127,7 +127,7 @@ public class UpdateFollowerFetchStateBenchmark {
         partition = new Partition(topicPartition, 100,
                 MetadataVersion.latestTesting(), 0, () -> -1, Time.SYSTEM,
                 alterPartitionListener, delayedOperations,
-                Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager);
+                Mockito.mock(MetadataCache.class), logManager, 
alterPartitionManager, topicId);
         partition.makeLeader(partitionState, offsetCheckpoints, topicId, 
Option.empty());
         replica1 = partition.getReplica(1).get();
         replica2 = partition.getReplica(2).get();

Reply via email to