This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b18f00b4492 KAFKA-19121 Move AddPartitionsToTxnConfig and 
TransactionStateManagerConfig out of KafkaConfig (#19439)
b18f00b4492 is described below

commit b18f00b4492d2a44c47e675348b8e7472fa3073c
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Apr 15 01:16:30 2025 +0800

    KAFKA-19121 Move AddPartitionsToTxnConfig and TransactionStateManagerConfig 
out of KafkaConfig (#19439)
    
    Both AddPartitionsToTxnConfig and TransactionStateManagerConfig are
    static configs and they don't have specific config check. We can move
    them out of KafkaConfig to simplify KafkaConfig.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../coordinator/transaction/TransactionCoordinator.scala    | 13 +++++++------
 core/src/main/scala/kafka/log/LogManager.scala              |  4 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala          |  6 ------
 core/src/main/scala/kafka/server/ReplicaManager.scala       |  7 ++++---
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala   |  4 ++--
 5 files changed, 15 insertions(+), 19 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index ed8140e596f..7130d39136e 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, 
TransactionResult}
 import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionLogConfig}
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionLogConfig, TransactionStateManagerConfig}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import org.apache.kafka.server.util.Scheduler
@@ -47,16 +47,17 @@ object TransactionCoordinator {
             time: Time): TransactionCoordinator = {
 
     val transactionLogConfig = new TransactionLogConfig(config)
-    val txnConfig = 
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
-      config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+    val transactionStateManagerConfig = new 
TransactionStateManagerConfig(config)
+    val txnConfig = 
TransactionConfig(transactionStateManagerConfig.transactionalIdExpirationMs,
+      transactionStateManagerConfig.transactionMaxTimeoutMs,
       transactionLogConfig.transactionTopicPartitions,
       transactionLogConfig.transactionTopicReplicationFactor,
       transactionLogConfig.transactionTopicSegmentBytes,
       transactionLogConfig.transactionLoadBufferSize,
       transactionLogConfig.transactionTopicMinISR,
-      
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
-      
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
-      config.transactionStateManagerConfig.transaction2PCEnabled,
+      
transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
+      
transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
+      transactionStateManagerConfig.transaction2PCEnabled,
       config.requestTimeoutMs)
 
     val txnStateManager = new TransactionStateManager(config.brokerId, 
scheduler, replicaManager, metadataCache, txnConfig,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index fc5b15e8fc6..506e46e6ce5 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -29,7 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
 import org.apache.kafka.common.errors.{InconsistentTopicIdException, 
KafkaStorageException, LogDirNotFoundException}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 
 import scala.jdk.CollectionConverters._
 import scala.collection._
@@ -1565,7 +1565,7 @@ object LogManager {
       flushRecoveryOffsetCheckpointMs = 
config.logFlushOffsetCheckpointIntervalMs,
       flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
-      maxTransactionTimeoutMs = 
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+      maxTransactionTimeoutMs = new 
TransactionStateManagerConfig(config).transactionMaxTimeoutMs,
       producerStateManagerConfig = new 
ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, 
transactionLogConfig.transactionPartitionVerificationEnable),
       producerIdExpirationCheckIntervalMs = 
transactionLogConfig.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 962488bc140..2f830ea8c99 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,11 +203,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
   def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
 
-  private val _transactionStateManagerConfig = new 
TransactionStateManagerConfig(this)
-  private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
-  def transactionStateManagerConfig: TransactionStateManagerConfig = 
_transactionStateManagerConfig
-  def addPartitionsToTxnConfig: AddPartitionsToTxnConfig = 
_addPartitionsToTxnConfig
-
   private val _quotaConfig = new QuotaConfig(this)
   def quotaConfig: QuotaConfig = _quotaConfig
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2dbfabd86d9..31756aea0a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,7 +47,7 @@ import 
org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{Exit, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig}
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.MetadataCache
@@ -287,6 +287,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val defaultActionQueue: ActionQueue = new 
DelayedActionQueue
                      ) extends Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  private val addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(config)
 
   val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
     new DelayedOperationPurgatory[DelayedProduce](
@@ -842,8 +843,8 @@ class ReplicaManager(val config: KafkaConfig,
       requestLocal
     )
 
-    val retryTimeoutMs = 
Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), 
config.requestTimeoutMs)
-    val addPartitionsRetryBackoffMs = 
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
+    val retryTimeoutMs = 
Math.min(addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), 
config.requestTimeoutMs)
+    val addPartitionsRetryBackoffMs = 
addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
     val startVerificationTimeMs = time.milliseconds
     def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, 
Errors], Map[TopicPartition, VerificationGuard])): Unit = {
       if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 61c9b681cd8..0a35e7dfc8b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -53,7 +53,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig}
 import org.apache.kafka.image._
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
@@ -2309,7 +2309,7 @@ class ReplicaManagerTest {
         assertFalse(result.hasFired)
         assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
-        
time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1)
+        time.sleep(new 
AddPartitionsToTxnConfig(config).addPartitionsToTxnRetryBackoffMs + 1)
         scheduler.tick()
 
         verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction(

Reply via email to