This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch KAFKA-16649 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 32eb8c3d68d4d6fc30705a55371cfe2ee57b8f77 Author: Colin P. McCabe <cmcc...@apache.org> AuthorDate: Tue Apr 30 14:05:32 2024 -0700 KAFKA-16649: Remove lock from DynamicBrokerConfig.removeReconfigurable Do not acquire the DynamicBrokerConfig lock in DynamicBrokerConfig.removeReconfigurable. It's not necessary, because the list that these functions are modifying is a thread-safe CopyOnWriteArrayList. In DynamicBrokerConfig.reloadUpdatedFilesWithoutConfigChange, I changed the code to use a simple Java forEach rather than a Scala conversion, in order to feel more confident that concurrent modifications to the List would not have any bad effects here. (forEach is always safe on CopyOnWriteArrayList.) --- .../scala/kafka/server/DynamicBrokerConfig.scala | 26 +++++++++-------- .../kafka/server/KRaftClusterTest.scala | 34 ++++++++++++++++++++-- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index e2879d4378e..156d2d02b09 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -303,17 +303,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(controller.socketServer) } - def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { + def addReconfigurable(reconfigurable: Reconfigurable): Unit = { verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala) reconfigurables.add(reconfigurable) } - def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) { + def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = { verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs) brokerReconfigurables.add(reconfigurable) } - def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { + def removeReconfigurable(reconfigurable: Reconfigurable): Unit = { reconfigurables.remove(reconfigurable) } @@ -370,16 +370,18 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging * changes are processed. At the moment, only listener configs are considered for reloading. */ private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) { - reconfigurables.asScala - .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains)) - .foreach { - case reconfigurable: ListenerReconfigurable => - val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = true) - val newConfig = new KafkaConfig(kafkaProps.asJava, false, None) - processListenerReconfigurable(reconfigurable, newConfig, Collections.emptyMap(), validateOnly = false, reloadOnly = true) - case reconfigurable => - trace(s"Files will not be reloaded without config change for $reconfigurable") + reconfigurables.forEach(r => { + if (ReloadableFileConfigs.exists(r.reconfigurableConfigs.contains)) { + r match { + case reconfigurable: ListenerReconfigurable => + val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = true) + val newConfig = new KafkaConfig(kafkaProps.asJava, false, None) + processListenerReconfigurable(reconfigurable, newConfig, Collections.emptyMap(), validateOnly = false, reloadOnly = true) + case reconfigurable => + trace(s"Files will not be reloaded without config change for $reconfigurable") + } } + }) } private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = { diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index f5db6bd1a95..f54f4e8b3fd 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -46,7 +46,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} -import org.apache.kafka.server.config.KRaftConfigs +import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.quota import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType} @@ -64,7 +64,7 @@ import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionExcept import java.util.concurrent.atomic.AtomicInteger import java.util.{Collections, Optional, OptionalLong, Properties} import scala.annotation.nowarn -import scala.collection.mutable +import scala.collection.{Seq, mutable} import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS} import scala.jdk.CollectionConverters._ @@ -1579,6 +1579,36 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testReduceNumNetworkThreads(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()). + setConfigProp(ServerConfigs.NUM_NETWORK_THREADS_CONFIG, "4"). + build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.clientProperties()) + try { + admin.incrementalAlterConfigs( + Collections.singletonMap(new ConfigResource(Type.BROKER, ""), + Collections.singletonList(new AlterConfigOp( + new ConfigEntry(ServerConfigs.NUM_NETWORK_THREADS_CONFIG, "8"), OpType.SET)))).all().get() + val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 1.toShort)) + val createTopicResult = admin.createTopics(newTopic) + createTopicResult.all().get() + waitForTopicListing(admin, Seq("test-topic"), Seq()) + } finally { + admin.close() + } + } finally { + cluster.close() + } + } } class BadAuthorizer extends Authorizer {