This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b18f00b4492 KAFKA-19121 Move AddPartitionsToTxnConfig and
TransactionStateManagerConfig out of KafkaConfig (#19439)
b18f00b4492 is described below
commit b18f00b4492d2a44c47e675348b8e7472fa3073c
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Apr 15 01:16:30 2025 +0800
KAFKA-19121 Move AddPartitionsToTxnConfig and TransactionStateManagerConfig
out of KafkaConfig (#19439)
Both AddPartitionsToTxnConfig and TransactionStateManagerConfig are
static configs and they don't have specific config check. We can move
them out of KafkaConfig to simplify KafkaConfig.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../coordinator/transaction/TransactionCoordinator.scala | 13 +++++++------
core/src/main/scala/kafka/log/LogManager.scala | 4 ++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 ------
core/src/main/scala/kafka/server/ReplicaManager.scala | 7 ++++---
.../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 4 ++--
5 files changed, 15 insertions(+), 19 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index ed8140e596f..7130d39136e 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse,
TransactionResult}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionLogConfig}
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler
@@ -47,16 +47,17 @@ object TransactionCoordinator {
time: Time): TransactionCoordinator = {
val transactionLogConfig = new TransactionLogConfig(config)
- val txnConfig =
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
- config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+ val transactionStateManagerConfig = new
TransactionStateManagerConfig(config)
+ val txnConfig =
TransactionConfig(transactionStateManagerConfig.transactionalIdExpirationMs,
+ transactionStateManagerConfig.transactionMaxTimeoutMs,
transactionLogConfig.transactionTopicPartitions,
transactionLogConfig.transactionTopicReplicationFactor,
transactionLogConfig.transactionTopicSegmentBytes,
transactionLogConfig.transactionLoadBufferSize,
transactionLogConfig.transactionTopicMinISR,
-
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
-
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
- config.transactionStateManagerConfig.transaction2PCEnabled,
+
transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
+
transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
+ transactionStateManagerConfig.transaction2PCEnabled,
config.requestTimeoutMs)
val txnStateManager = new TransactionStateManager(config.brokerId,
scheduler, replicaManager, metadataCache, txnConfig,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index fc5b15e8fc6..506e46e6ce5 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -29,7 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition,
Uuid}
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException,
KafkaStorageException, LogDirNotFoundException}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -1565,7 +1565,7 @@ object LogManager {
flushRecoveryOffsetCheckpointMs =
config.logFlushOffsetCheckpointIntervalMs,
flushStartOffsetCheckpointMs =
config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
- maxTransactionTimeoutMs =
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
+ maxTransactionTimeoutMs = new
TransactionStateManagerConfig(config).transactionMaxTimeoutMs,
producerStateManagerConfig = new
ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs,
transactionLogConfig.transactionPartitionVerificationEnable),
producerIdExpirationCheckIntervalMs =
transactionLogConfig.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 962488bc140..2f830ea8c99 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,11 +203,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
- private val _transactionStateManagerConfig = new
TransactionStateManagerConfig(this)
- private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
- def transactionStateManagerConfig: TransactionStateManagerConfig =
_transactionStateManagerConfig
- def addPartitionsToTxnConfig: AddPartitionsToTxnConfig =
_addPartitionsToTxnConfig
-
private val _quotaConfig = new QuotaConfig(this)
def quotaConfig: QuotaConfig = _quotaConfig
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2dbfabd86d9..31756aea0a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,7 +47,7 @@ import
org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
@@ -287,6 +287,7 @@ class ReplicaManager(val config: KafkaConfig,
val defaultActionQueue: ActionQueue = new
DelayedActionQueue
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+ private val addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(config)
val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
new DelayedOperationPurgatory[DelayedProduce](
@@ -842,8 +843,8 @@ class ReplicaManager(val config: KafkaConfig,
requestLocal
)
- val retryTimeoutMs =
Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(),
config.requestTimeoutMs)
- val addPartitionsRetryBackoffMs =
config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
+ val retryTimeoutMs =
Math.min(addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(),
config.requestTimeoutMs)
+ val addPartitionsRetryBackoffMs =
addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
val startVerificationTimeMs = time.milliseconds
def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition,
Errors], Map[TopicPartition, VerificationGuard])): Unit = {
if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 61c9b681cd8..0a35e7dfc8b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -53,7 +53,7 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig}
import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
@@ -2309,7 +2309,7 @@ class ReplicaManagerTest {
assertFalse(result.hasFired)
assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
-
time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1)
+ time.sleep(new
AddPartitionsToTxnConfig(config).addPartitionsToTxnRetryBackoffMs + 1)
scheduler.tick()
verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction(