This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad934d3202c MINOR: Remove threadNamePrefix parameter from 
ReplicaManager and ReplicaFetcherManager (#20069)
ad934d3202c is described below

commit ad934d3202c3bd371f04ca7cc3c0a3cf515fc74c
Author: Tsung-Han Ho (Miles Ho) <[email protected]>
AuthorDate: Tue Jul 1 20:36:50 2025 +0800

    MINOR: Remove threadNamePrefix parameter from ReplicaManager and 
ReplicaFetcherManager (#20069)
    
    - remove `threadNamePrefix` from `ReplicaManager` constructor
    - update `BrokerServer` to use updated constructor
    - remove `threadNamePrefix` from `ReplicaFetcherManager`
    
    Reviewers: PoAn Yang <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../server/builders/ReplicaManagerBuilder.java      |  1 -
 core/src/main/scala/kafka/server/BrokerServer.scala |  1 -
 .../scala/kafka/server/ReplicaFetcherManager.scala  |  4 +---
 .../main/scala/kafka/server/ReplicaManager.scala    |  7 +++----
 .../AbstractCoordinatorConcurrencyTest.scala        |  3 +--
 .../scala/unit/kafka/server/LogRecoveryTest.scala   |  2 +-
 .../server/ReplicaManagerConcurrencyTest.scala      |  1 -
 .../unit/kafka/server/ReplicaManagerTest.scala      | 21 ++++++---------------
 .../src/test/scala/unit/kafka/utils/TestUtils.scala |  1 -
 9 files changed, 12 insertions(+), 29 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index ba6221732f6..5426d55a64d 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -126,7 +126,6 @@ public class ReplicaManagerBuilder {
                              Option.empty(),
                              Option.empty(),
                              Option.empty(),
-                             Option.empty(),
                              () ->  -1L,
                              Option.empty(),
                              DirectoryEventHandler.NOOP,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8f2e15e7dc0..c3e666a736f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -348,7 +348,6 @@ class BrokerServer(
         logDirFailureChannel = logDirFailureChannel,
         alterPartitionManager = alterPartitionManager,
         brokerTopicStats = brokerTopicStats,
-        threadNamePrefix = None, // The ReplicaManager only runs on the 
broker, and already includes the ID in thread names.
         delayedRemoteFetchPurgatoryParam = None,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
         addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 9bcc93e8244..96308fb400f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -28,7 +28,6 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                             protected val replicaManager: ReplicaManager,
                             metrics: Metrics,
                             time: Time,
-                            threadNamePrefix: Option[String] = None,
                             quotaManager: ReplicationQuotaManager,
                             metadataVersionSupplier: () => MetadataVersion,
                             brokerEpochSupplier: () => Long)
@@ -38,8 +37,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
         numFetchers = brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
-    val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
-    val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+    val threadName = s"ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
     val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +
       s"fetcherId=$fetcherId] ")
     val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, 
metrics, time, fetcherId,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f18fe120e2c..00efb5f7a07 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -225,7 +225,6 @@ class ReplicaManager(val config: KafkaConfig,
                      delayedRemoteFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
                      delayedRemoteListOffsetsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
                      delayedShareFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
-                     threadNamePrefix: Option[String] = None,
                      val brokerEpochSupplier: () => Long = () => -1,
                      addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None,
                      val directoryEventHandler: DirectoryEventHandler = 
DirectoryEventHandler.NOOP,
@@ -263,7 +262,7 @@ class ReplicaManager(val config: KafkaConfig,
   protected val localBrokerId = config.brokerId
   protected val allPartitions = new ConcurrentHashMap[TopicPartition, 
HostedPartition]
   private val replicaStateChangeLock = new Object
-  val replicaFetcherManager = createReplicaFetcherManager(metrics, time, 
threadNamePrefix, quotaManagers.follower)
+  val replicaFetcherManager = createReplicaFetcherManager(metrics, time, 
quotaManagers.follower)
   private[server] val replicaAlterLogDirsManager = 
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   @volatile private[server] var highWatermarkCheckpoints: Map[String, 
OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
@@ -2202,8 +2201,8 @@ class ReplicaManager(val config: KafkaConfig,
       })
   }
 
-  protected def createReplicaFetcherManager(metrics: Metrics, time: Time, 
threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
-    new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, 
quotaManager, () => metadataCache.metadataVersion(), brokerEpochSupplier)
+  protected def createReplicaFetcherManager(metrics: Metrics, time: Time, 
quotaManager: ReplicationQuotaManager) = {
+    new ReplicaFetcherManager(config, this, metrics, time, quotaManager, () => 
metadataCache.metadataVersion(), brokerEpochSupplier)
   }
 
   protected def createReplicaAlterLogDirsManager(quotaManager: 
ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index d5dadcfd9f4..8f10811091d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -190,8 +190,7 @@ object AbstractCoordinatorConcurrencyTest {
       delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam),
       delayedDeleteRecordsPurgatoryParam = 
Some(delayedDeleteRecordsPurgatoryParam),
       delayedRemoteFetchPurgatoryParam = 
Some(delayedRemoteFetchPurgatoryParam),
-      delayedRemoteListOffsetsPurgatoryParam = 
Some(delayedRemoteListOffsetsPurgatoryParam),
-      threadNamePrefix = Option(this.getClass.getName)) {
+      delayedRemoteListOffsetsPurgatoryParam = 
Some(delayedRemoteListOffsetsPurgatoryParam)) {
 
     @volatile var logs: mutable.Map[TopicPartition, (UnifiedLog, Long)] = _
 
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 6e2f3073ba1..f82a741d4d2 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -215,7 +215,7 @@ class LogRecoveryTest extends QuorumTestHarness {
     server2.startup()
     updateProducer()
     // check if leader moves to the other server
-    leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = 
Some(leader))
+    leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt = 
Some(leader), timeout = 30000L)
     assertEquals(1, leader, "Leader must move to broker 1")
 
     assertEquals(hw, hwFile1.read().getOrDefault(topicPartition, 0L))
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 32bfa572544..52dd464e5c3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -200,7 +200,6 @@ class ReplicaManagerConcurrencyTest extends Logging {
       override def createReplicaFetcherManager(
         metrics: Metrics,
         time: Time,
-        threadNamePrefix: Option[String],
         quotaManager: ReplicationQuotaManager
       ): ReplicaFetcherManager = {
         Mockito.mock(classOf[ReplicaFetcherManager])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index cd758529df2..3f2753d3ab7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -255,8 +255,7 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = new KRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      threadNamePrefix = Option(this.getClass.getName))
+      alterPartitionManager = alterPartitionManager)
     try {
       def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
         assert(responseStatus.values.head.error == 
Errors.INVALID_REQUIRED_ACKS)
@@ -461,8 +460,7 @@ class ReplicaManagerTest {
         quotaManagers = quotaManager,
         metadataCache = new KRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-        alterPartitionManager = alterPartitionManager,
-        threadNamePrefix = Option(this.getClass.getName))
+        alterPartitionManager = alterPartitionManager)
 
       // shutdown ReplicaManager so that metrics are removed
       rm.shutdown(checkpointHW = false)
@@ -2615,15 +2613,13 @@ class ReplicaManagerTest {
       delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
       delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
       delayedRemoteListOffsetsPurgatoryParam = 
Some(mockRemoteListOffsetsPurgatory),
-      delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
-      threadNamePrefix = Option(this.getClass.getName)) {
+      delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory)) {
 
       override protected def createReplicaFetcherManager(metrics: Metrics,
                                                          time: Time,
-                                                         threadNamePrefix: 
Option[String],
                                                          
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
         val rm = this
-        new ReplicaFetcherManager(this.config, rm, metrics, time, 
threadNamePrefix, replicationQuotaManager, () => 
this.metadataCache.metadataVersion(), () => 1) {
+        new ReplicaFetcherManager(this.config, rm, metrics, time, 
replicationQuotaManager, () => this.metadataCache.metadataVersion(), () => 1) {
 
           override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
             val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${rm.config.brokerId}, leaderId=${sourceBroker.id}, " +
@@ -3049,7 +3045,6 @@ class ReplicaManagerTest {
       delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
       delayedRemoteListOffsetsPurgatoryParam = 
Some(mockDelayedRemoteListOffsetsPurgatory),
       delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
-      threadNamePrefix = Option(this.getClass.getName),
       addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
       directoryEventHandler = directoryEventHandler,
       remoteLogManager = if (enableRemoteStorage) {
@@ -3062,7 +3057,6 @@ class ReplicaManagerTest {
       override protected def createReplicaFetcherManager(
         metrics: Metrics,
         time: Time,
-        threadNamePrefix: Option[String],
         quotaManager: ReplicationQuotaManager
       ): ReplicaFetcherManager = {
         mockReplicaFetcherManager.getOrElse {
@@ -3070,15 +3064,13 @@ class ReplicaManagerTest {
             super.createReplicaFetcherManager(
               metrics,
               time,
-              threadNamePrefix,
               quotaManager
             )
             val config = this.config
             val metadataCache = this.metadataCache
-            new ReplicaFetcherManager(config, this, metrics, time, 
threadNamePrefix, quotaManager, () => metadataCache.metadataVersion(), () => 1) 
{
+            new ReplicaFetcherManager(config, this, metrics, time, 
quotaManager, () => metadataCache.metadataVersion(), () => 1) {
               override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): ReplicaFetcherThread = {
-                val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
-                val threadName = 
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+                val threadName = 
s"ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
 
                 val tp = new TopicPartition(topic, 0)
                 val leader = new MockLeaderEndPoint() {
@@ -3107,7 +3099,6 @@ class ReplicaManagerTest {
             super.createReplicaFetcherManager(
               metrics,
               time,
-              threadNamePrefix,
               quotaManager
             )
           }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd69155097b..861c520c666 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -900,7 +900,6 @@ object TestUtils extends Logging {
       } else if (oldLeaderOpt.isDefined) {
           debug(s"Checking leader that has changed from $oldLeaderOpt")
           brokers.find { broker =>
-            
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
             broker.config.brokerId != oldLeaderOpt.get &&
               
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
           }.map(_.config.brokerId)

Reply via email to