This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-16649
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 32eb8c3d68d4d6fc30705a55371cfe2ee57b8f77
Author: Colin P. McCabe <cmcc...@apache.org>
AuthorDate: Tue Apr 30 14:05:32 2024 -0700

    KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable
    
    Do not acquire the DynamicBrokerConfig lock in 
DynamicBrokerConfig.removeReconfigurable. It's not
    necessary, because the list that these functions are modifying is a 
thread-safe
    CopyOnWriteArrayList.  In 
DynamicBrokerConfig.reloadUpdatedFilesWithoutConfigChange, I changed the
    code to use a simple Java forEach rather than a Scala conversion, in order 
to feel more confident
    that concurrent modifications to the List would not have any bad effects 
here. (forEach is always
    safe on CopyOnWriteArrayList.)
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 26 +++++++++--------
 .../kafka/server/KRaftClusterTest.scala            | 34 ++++++++++++++++++++--
 2 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index e2879d4378e..156d2d02b09 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -303,17 +303,17 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     addBrokerReconfigurable(controller.socketServer)
   }
 
-  def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+  def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
     reconfigurables.add(reconfigurable)
   }
 
-  def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+  def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
     brokerReconfigurables.add(reconfigurable)
   }
 
-  def removeReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+  def removeReconfigurable(reconfigurable: Reconfigurable): Unit = {
     reconfigurables.remove(reconfigurable)
   }
 
@@ -370,16 +370,18 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
    * changes are processed. At the moment, only listener configs are 
considered for reloading.
    */
   private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: 
Properties): Unit = CoreUtils.inWriteLock(lock) {
-    reconfigurables.asScala
-      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
-      .foreach {
-        case reconfigurable: ListenerReconfigurable =>
-          val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = 
true)
-          val newConfig = new KafkaConfig(kafkaProps.asJava, false, None)
-          processListenerReconfigurable(reconfigurable, newConfig, 
Collections.emptyMap(), validateOnly = false, reloadOnly = true)
-        case reconfigurable =>
-          trace(s"Files will not be reloaded without config change for 
$reconfigurable")
+    reconfigurables.forEach(r => {
+      if (ReloadableFileConfigs.exists(r.reconfigurableConfigs.contains)) {
+        r match {
+          case reconfigurable: ListenerReconfigurable =>
+            val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = 
true)
+            val newConfig = new KafkaConfig(kafkaProps.asJava, false, None)
+            processListenerReconfigurable(reconfigurable, newConfig, 
Collections.emptyMap(), validateOnly = false, reloadOnly = true)
+          case reconfigurable =>
+            trace(s"Files will not be reloaded without config change for 
$reconfigurable")
+        }
       }
+    })
   }
 
   private def maybeCreatePasswordEncoder(secret: Option[Password]): 
Option[PasswordEncoder] = {
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index f5db6bd1a95..f54f4e8b3fd 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.KRaftConfigs
+import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.quota
 import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
@@ -64,7 +64,7 @@ import java.util.concurrent.{CompletableFuture, 
CompletionStage, ExecutionExcept
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional, OptionalLong, Properties}
 import scala.annotation.nowarn
-import scala.collection.mutable
+import scala.collection.{Seq, mutable}
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
 
@@ -1579,6 +1579,36 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testReduceNumNetworkThreads(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).
+      setConfigProp(ServerConfigs.NUM_NETWORK_THREADS_CONFIG, "4").
+      build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.incrementalAlterConfigs(
+          Collections.singletonMap(new ConfigResource(Type.BROKER, ""),
+            Collections.singletonList(new AlterConfigOp(
+              new ConfigEntry(ServerConfigs.NUM_NETWORK_THREADS_CONFIG, "8"), 
OpType.SET)))).all().get()
+        val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 
1.toShort))
+        val createTopicResult = admin.createTopics(newTopic)
+        createTopicResult.all().get()
+        waitForTopicListing(admin, Seq("test-topic"), Seq())
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
 }
 
 class BadAuthorizer extends Authorizer {

Reply via email to