This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad934d3202c MINOR: Remove threadNamePrefix parameter from
ReplicaManager and ReplicaFetcherManager (#20069)
ad934d3202c is described below
commit ad934d3202c3bd371f04ca7cc3c0a3cf515fc74c
Author: Tsung-Han Ho (Miles Ho) <[email protected]>
AuthorDate: Tue Jul 1 20:36:50 2025 +0800
MINOR: Remove threadNamePrefix parameter from ReplicaManager and
ReplicaFetcherManager (#20069)
- remove `threadNamePrefix` from `ReplicaManager` constructor
- update `BrokerServer` to use updated constructor
- remove `threadNamePrefix` from `ReplicaFetcherManager`
Reviewers: PoAn Yang <[email protected]>, TengYao Chi
<[email protected]>
---
.../server/builders/ReplicaManagerBuilder.java | 1 -
core/src/main/scala/kafka/server/BrokerServer.scala | 1 -
.../scala/kafka/server/ReplicaFetcherManager.scala | 4 +---
.../main/scala/kafka/server/ReplicaManager.scala | 7 +++----
.../AbstractCoordinatorConcurrencyTest.scala | 3 +--
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 +-
.../server/ReplicaManagerConcurrencyTest.scala | 1 -
.../unit/kafka/server/ReplicaManagerTest.scala | 21 ++++++---------------
.../src/test/scala/unit/kafka/utils/TestUtils.scala | 1 -
9 files changed, 12 insertions(+), 29 deletions(-)
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index ba6221732f6..5426d55a64d 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -126,7 +126,6 @@ public class ReplicaManagerBuilder {
Option.empty(),
Option.empty(),
Option.empty(),
- Option.empty(),
() -> -1L,
Option.empty(),
DirectoryEventHandler.NOOP,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8f2e15e7dc0..c3e666a736f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -348,7 +348,6 @@ class BrokerServer(
logDirFailureChannel = logDirFailureChannel,
alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
- threadNamePrefix = None, // The ReplicaManager only runs on the
broker, and already includes the ID in thread names.
delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 9bcc93e8244..96308fb400f 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -28,7 +28,6 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
protected val replicaManager: ReplicaManager,
metrics: Metrics,
time: Time,
- threadNamePrefix: Option[String] = None,
quotaManager: ReplicationQuotaManager,
metadataVersionSupplier: () => MetadataVersion,
brokerEpochSupplier: () => Long)
@@ -38,8 +37,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
numFetchers = brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): ReplicaFetcherThread = {
- val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
- val threadName =
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+ val threadName = s"ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +
s"fetcherId=$fetcherId] ")
val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig,
metrics, time, fetcherId,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f18fe120e2c..00efb5f7a07 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -225,7 +225,6 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
delayedRemoteListOffsetsPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
delayedShareFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedShareFetch]] = None,
- threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager:
Option[AddPartitionsToTxnManager] = None,
val directoryEventHandler: DirectoryEventHandler =
DirectoryEventHandler.NOOP,
@@ -263,7 +262,7 @@ class ReplicaManager(val config: KafkaConfig,
protected val localBrokerId = config.brokerId
protected val allPartitions = new ConcurrentHashMap[TopicPartition,
HostedPartition]
private val replicaStateChangeLock = new Object
- val replicaFetcherManager = createReplicaFetcherManager(metrics, time,
threadNamePrefix, quotaManagers.follower)
+ val replicaFetcherManager = createReplicaFetcherManager(metrics, time,
quotaManagers.follower)
private[server] val replicaAlterLogDirsManager =
createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@volatile private[server] var highWatermarkCheckpoints: Map[String,
OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
@@ -2202,8 +2201,8 @@ class ReplicaManager(val config: KafkaConfig,
})
}
- protected def createReplicaFetcherManager(metrics: Metrics, time: Time,
threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix,
quotaManager, () => metadataCache.metadataVersion(), brokerEpochSupplier)
+ protected def createReplicaFetcherManager(metrics: Metrics, time: Time,
quotaManager: ReplicationQuotaManager) = {
+ new ReplicaFetcherManager(config, this, metrics, time, quotaManager, () =>
metadataCache.metadataVersion(), brokerEpochSupplier)
}
protected def createReplicaAlterLogDirsManager(quotaManager:
ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index d5dadcfd9f4..8f10811091d 100644
---
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -190,8 +190,7 @@ object AbstractCoordinatorConcurrencyTest {
delayedFetchPurgatoryParam = Some(delayedFetchPurgatoryParam),
delayedDeleteRecordsPurgatoryParam =
Some(delayedDeleteRecordsPurgatoryParam),
delayedRemoteFetchPurgatoryParam =
Some(delayedRemoteFetchPurgatoryParam),
- delayedRemoteListOffsetsPurgatoryParam =
Some(delayedRemoteListOffsetsPurgatoryParam),
- threadNamePrefix = Option(this.getClass.getName)) {
+ delayedRemoteListOffsetsPurgatoryParam =
Some(delayedRemoteListOffsetsPurgatoryParam)) {
@volatile var logs: mutable.Map[TopicPartition, (UnifiedLog, Long)] = _
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 6e2f3073ba1..f82a741d4d2 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -215,7 +215,7 @@ class LogRecoveryTest extends QuorumTestHarness {
server2.startup()
updateProducer()
// check if leader moves to the other server
- leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt =
Some(leader))
+ leader = awaitLeaderChange(servers, topicPartition, oldLeaderOpt =
Some(leader), timeout = 30000L)
assertEquals(1, leader, "Leader must move to broker 1")
assertEquals(hw, hwFile1.read().getOrDefault(topicPartition, 0L))
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 32bfa572544..52dd464e5c3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -200,7 +200,6 @@ class ReplicaManagerConcurrencyTest extends Logging {
override def createReplicaFetcherManager(
metrics: Metrics,
time: Time,
- threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager
): ReplicaFetcherManager = {
Mockito.mock(classOf[ReplicaFetcherManager])
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index cd758529df2..3f2753d3ab7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -255,8 +255,7 @@ class ReplicaManagerTest {
quotaManagers = quotaManager,
metadataCache = new KRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
- alterPartitionManager = alterPartitionManager,
- threadNamePrefix = Option(this.getClass.getName))
+ alterPartitionManager = alterPartitionManager)
try {
def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]):
Unit = {
assert(responseStatus.values.head.error ==
Errors.INVALID_REQUIRED_ACKS)
@@ -461,8 +460,7 @@ class ReplicaManagerTest {
quotaManagers = quotaManager,
metadataCache = new KRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
- alterPartitionManager = alterPartitionManager,
- threadNamePrefix = Option(this.getClass.getName))
+ alterPartitionManager = alterPartitionManager)
// shutdown ReplicaManager so that metrics are removed
rm.shutdown(checkpointHW = false)
@@ -2615,15 +2613,13 @@ class ReplicaManagerTest {
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam =
Some(mockRemoteListOffsetsPurgatory),
- delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
- threadNamePrefix = Option(this.getClass.getName)) {
+ delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory)) {
override protected def createReplicaFetcherManager(metrics: Metrics,
time: Time,
- threadNamePrefix:
Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
val rm = this
- new ReplicaFetcherManager(this.config, rm, metrics, time,
threadNamePrefix, replicationQuotaManager, () =>
this.metadataCache.metadataVersion(), () => 1) {
+ new ReplicaFetcherManager(this.config, rm, metrics, time,
replicationQuotaManager, () => this.metadataCache.metadataVersion(), () => 1) {
override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${rm.config.brokerId}, leaderId=${sourceBroker.id}, " +
@@ -3049,7 +3045,6 @@ class ReplicaManagerTest {
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam =
Some(mockDelayedRemoteListOffsetsPurgatory),
delayedShareFetchPurgatoryParam = Some(mockDelayedShareFetchPurgatory),
- threadNamePrefix = Option(this.getClass.getName),
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler,
remoteLogManager = if (enableRemoteStorage) {
@@ -3062,7 +3057,6 @@ class ReplicaManagerTest {
override protected def createReplicaFetcherManager(
metrics: Metrics,
time: Time,
- threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager
): ReplicaFetcherManager = {
mockReplicaFetcherManager.getOrElse {
@@ -3070,15 +3064,13 @@ class ReplicaManagerTest {
super.createReplicaFetcherManager(
metrics,
time,
- threadNamePrefix,
quotaManager
)
val config = this.config
val metadataCache = this.metadataCache
- new ReplicaFetcherManager(config, this, metrics, time,
threadNamePrefix, quotaManager, () => metadataCache.metadataVersion(), () => 1)
{
+ new ReplicaFetcherManager(config, this, metrics, time,
quotaManager, () => metadataCache.metadataVersion(), () => 1) {
override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): ReplicaFetcherThread = {
- val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
- val threadName =
s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+ val threadName =
s"ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
val tp = new TopicPartition(topic, 0)
val leader = new MockLeaderEndPoint() {
@@ -3107,7 +3099,6 @@ class ReplicaManagerTest {
super.createReplicaFetcherManager(
metrics,
time,
- threadNamePrefix,
quotaManager
)
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd69155097b..861c520c666 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -900,7 +900,6 @@ object TestUtils extends Logging {
} else if (oldLeaderOpt.isDefined) {
debug(s"Checking leader that has changed from $oldLeaderOpt")
brokers.find { broker =>
-
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
broker.config.brokerId != oldLeaderOpt.get &&
broker.replicaManager.onlinePartition(tp).exists(_.leaderLogIfLocal.isDefined)
}.map(_.config.brokerId)