This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b83a23a4f90 KAFKA-18946 Move BrokerReconfigurable and 
DynamicProducerStateManagerConfig to server module (#19174)
b83a23a4f90 is described below

commit b83a23a4f90f757b3fcec05c93f27385daa49bcd
Author: TengYao Chi <[email protected]>
AuthorDate: Thu Mar 20 21:30:19 2025 +0800

    KAFKA-18946 Move BrokerReconfigurable and DynamicProducerStateManagerConfig 
to server module (#19174)
    
    This patch is to move `DynamicProducerStateManagerConfig` and 
`BrokerReconfigurable` to the server module.
    
    Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 39 +++++-------
 core/src/main/scala/kafka/server/KafkaConfig.scala |  3 +-
 .../kafka/server/config/AbstractKafkaConfig.java   |  2 +
 .../kafka/server/config/BrokerReconfigurable.java  | 69 ++++++++++++++++++++++
 .../config/DynamicProducerStateManagerConfig.java  | 67 +++++++++++++++++++++
 5 files changed, 155 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index b9b25c1afeb..29a3971d37f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -40,12 +40,12 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.server.{ProcessRole, DynamicThreadPool}
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs, 
ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, 
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
MetricConfigs}
 import org.apache.kafka.server.telemetry.ClientTelemetry
 import org.apache.kafka.snapshot.RecordsSnapshotReader
-import org.apache.kafka.storage.internals.log.{LogConfig, 
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.LogConfig
 
 import scala.collection._
 import scala.jdk.CollectionConverters._
@@ -323,6 +323,17 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     reconfigurables.add(reconfigurable)
   }
 
+  def addBrokerReconfigurable(reconfigurable: 
org.apache.kafka.server.config.BrokerReconfigurable): Unit = {
+    verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
+    brokerReconfigurables.add(new BrokerReconfigurable {
+      override def reconfigurableConfigs: Set[String] = 
reconfigurable.reconfigurableConfigs().asScala
+
+      override def validateReconfiguration(newConfig: KafkaConfig): Unit = 
reconfigurable.validateReconfiguration(newConfig)
+
+      override def reconfigure(oldConfig: KafkaConfig, newConfig: 
KafkaConfig): Unit = reconfigurable.reconfigure(oldConfig, newConfig)
+    })
+  }
+
   def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
     brokerReconfigurables.add(reconfigurable)
@@ -605,6 +616,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   }
 }
 
+/**
+ * Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead.
+ */
 trait BrokerReconfigurable {
 
   def reconfigurableConfigs: Set[String]
@@ -977,27 +991,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
 
 }
 
-class DynamicProducerStateManagerConfig(val producerStateManagerConfig: 
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
-  def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
-    if (producerStateManagerConfig.producerIdExpirationMs != 
newConfig.transactionLogConfig.producerIdExpirationMs) {
-      info(s"Reconfigure 
${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from 
${producerStateManagerConfig.producerIdExpirationMs} to 
${newConfig.transactionLogConfig.producerIdExpirationMs}")
-      
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.transactionLogConfig.producerIdExpirationMs)
-    }
-    if (producerStateManagerConfig.transactionVerificationEnabled != 
newConfig.transactionLogConfig.transactionPartitionVerificationEnable) {
-      info(s"Reconfigure 
${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from 
${producerStateManagerConfig.transactionVerificationEnabled} to 
${newConfig.transactionLogConfig.transactionPartitionVerificationEnable}")
-      
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionLogConfig.transactionPartitionVerificationEnable)
-    }
-  }
-
-  def validateReconfiguration(newConfig: KafkaConfig): Unit = {
-    if (newConfig.transactionLogConfig.producerIdExpirationMs < 0)
-      throw new 
ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} 
cannot be less than 0, current value is 
${producerStateManagerConfig.producerIdExpirationMs}, and new value is 
${newConfig.transactionLogConfig.producerIdExpirationMs}")
-  }
-
-  override def reconfigurableConfigs: Set[String] = 
DynamicProducerStateManagerConfig
-
-}
-
 class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
   override def reconfigurableConfigs: Set[String] = {
     DynamicRemoteLogConfig.ReconfigurableConfigs
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1c91f58492e..608e27f25e4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -204,10 +204,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
   def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
 
-  private val _transactionLogConfig = new TransactionLogConfig(this)
+  override val transactionLogConfig = new TransactionLogConfig(this)
   private val _transactionStateManagerConfig = new 
TransactionStateManagerConfig(this)
   private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
-  def transactionLogConfig: TransactionLogConfig = _transactionLogConfig
   def transactionStateManagerConfig: TransactionStateManagerConfig = 
_transactionStateManagerConfig
   def addPartitionsToTxnConfig: AddPartitionsToTxnConfig = 
_addPartitionsToTxnConfig
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 87bf18a412f..fc6906b96d3 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -83,4 +83,6 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
     public int backgroundThreads() {
         return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
     }
+
+    public abstract TransactionLogConfig transactionLogConfig();
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java 
b/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
new file mode 100644
index 00000000000..2b3fe9dcf34
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import java.util.Set;
+
+/**
+ * An interface for Kafka broker configs that support dynamic reconfiguration.
+ * <p>
+ * Components that implement this interface can have their configurations 
updated
+ * at runtime without requiring a broker restart.
+ * <p>
+ * The reconfiguration process follows three steps:
+ * <ol>
+ *   <li>Determining which configurations can be dynamically updated via 
{@link #reconfigurableConfigs()}</li>
+ *   <li>Validating the new configuration before applying it via {@link 
#validateReconfiguration(AbstractKafkaConfig)}</li>
+ *   <li>Applying the new configuration via {@link 
#reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li>
+ * </ol>
+ * <strong>Note: Since Kafka is eliminating Scala, developers should implement 
this interface instead of {@link kafka.server.BrokerReconfigurable}</strong>
+ *
+ *
+ * @see AbstractKafkaConfig
+ */
+public interface BrokerReconfigurable {
+    /**
+     * Returns the set of configuration keys that can be dynamically 
reconfigured.
+     *
+     * <p>
+     * Only the configurations returned by this method will be considered for
+     * dynamic updates by the broker.
+     *
+     * @return a set of configuration key names that can be dynamically updated
+     */
+    Set<String> reconfigurableConfigs();
+
+    /**
+     * Validates the new configuration before applying it.
+     * <p>
+     * This method should verify that the new configuration values are valid 
and
+     * can be safely applied.
+     *
+     * @param newConfig the new configuration to validate
+     */
+    void validateReconfiguration(AbstractKafkaConfig newConfig);
+
+    /**
+     * Applies the new configuration.
+     * <p>
+     * This method is called after the new configuration has been validated.
+     *
+     * @param oldConfig the previous configuration
+     * @param newConfig the new configuration to apply
+     */
+    void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig 
newConfig);
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
 
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
new file mode 100644
index 00000000000..4158194f000
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+public class DynamicProducerStateManagerConfig implements BrokerReconfigurable 
{
+    private final Logger log = 
LoggerFactory.getLogger(DynamicProducerStateManagerConfig.class);
+    private final ProducerStateManagerConfig producerStateManagerConfig;
+
+    public DynamicProducerStateManagerConfig(ProducerStateManagerConfig 
producerStateManagerConfig) {
+        this.producerStateManagerConfig = producerStateManagerConfig;
+    }
+
+    @Override
+    public Set<String> reconfigurableConfigs() {
+        return Set.of(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+    }
+
+    @Override
+    public void validateReconfiguration(AbstractKafkaConfig newConfig) {
+        TransactionLogConfig transactionLogConfig = 
newConfig.transactionLogConfig();
+        if (transactionLogConfig.producerIdExpirationMs() < 0)
+            throw new 
ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot 
be less than 0, current value is " +
+                                      
producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " + 
transactionLogConfig.producerIdExpirationMs());
+    }
+
+    @Override
+    public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig 
newConfig) {
+        TransactionLogConfig transactionLogConfig = 
newConfig.transactionLogConfig();
+        if (producerStateManagerConfig.producerIdExpirationMs() != 
transactionLogConfig.producerIdExpirationMs()) {
+            log.info("Reconfigure {} from {} to {}",
+                TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
+                producerStateManagerConfig.producerIdExpirationMs(),
+                transactionLogConfig.producerIdExpirationMs());
+            
producerStateManagerConfig.setProducerIdExpirationMs(transactionLogConfig.producerIdExpirationMs());
+        }
+        if (producerStateManagerConfig.transactionVerificationEnabled() != 
transactionLogConfig.transactionPartitionVerificationEnable()) {
+            log.info("Reconfigure {} from {} to {}",
+                
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
+                producerStateManagerConfig.transactionVerificationEnabled(),
+                transactionLogConfig.transactionPartitionVerificationEnable());
+            
producerStateManagerConfig.setTransactionVerificationEnabled(transactionLogConfig.transactionPartitionVerificationEnable());
+        }
+    }
+}

Reply via email to