tombentley commented on a change in pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#discussion_r769814004
##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -203,23 +207,22 @@ class SocketServer(val config: KafkaConfig,
* Before starting them, we ensure that authorizer has all the metadata to
authorize
* requests on that endpoint by waiting on the provided future.
*/
- private def startAcceptorAndProcessors(threadPrefix: String,
- endpoint: EndPoint,
+ private def startAcceptorAndProcessors(endpoint: EndPoint,
Review comment:
If `acceptor` has `endpoint` as a member, why do we need to pass it
explicitly here?
##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -95,20 +95,22 @@ class SocketServer(val config: KafkaConfig,
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS,
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false,
memoryPoolSensor) else MemoryPool.NONE
// data-plane
- private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
- private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint,
Acceptor]()
- val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests,
DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
+ private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint,
DataPlaneAcceptor]()
+ val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests,
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
// control-plane
- private var controlPlaneProcessorOpt : Option[Processor] = None
- private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
+ private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] =
None
val controlPlaneRequestChannelOpt: Option[RequestChannel] =
config.controlPlaneListenerName.map(_ =>
-new RequestChannel(20, ControlPlaneMetricPrefix, time,
apiVersionManager.newRequestMetrics))
+new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time,
apiVersionManager.newRequestMetrics))
- private var nextProcessorId = 0
+ private val nextPId: AtomicInteger = new AtomicInteger(0)
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
private var startedProcessingRequests = false
private var stoppedProcessingRequests = false
+ def nextProcessorId(): Int = {
+nextPId.getAndIncrement()
Review comment:
`nextPId` is a bit hard on the eyes. `_nextProcessorId` perhaps? Also
perhaps a comment that this it only needed for the legacy metric names.
##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -920,7 +919,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String):
Map[String, AnyRef] = {
kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case
(key, _) =>
// skip the reconfigurable configs
- !DynamicSecurityConfigs.contains(key) &&
!SocketServer.ListenerReconfigurableConfigs.contains(key)
+ !DynamicSecurityConfigs.contains(key) &&
!SocketServer.ListenerReconfigurableConfigs.contains(key) &&
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)
Review comment:
IIUC the reconfigurability of thread numbers is not specific to the data
plane, so why is the constant declared on `DataPlaneAcceptor` rather than
`Acceptor`?
##
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##
@@ -82,6 +82,11 @@ class SocketServerTest {
private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
private var logLevelToRestore: Level = _
+ def endpoint: EndPoint = {
+KafkaConfig.fromProps(props, doLog = false).dataPlaneListeners.head
+ }
+ def listener: String = endpoint.listenerName.value
+ @volatile var uncaughtExceptions = 0
Review comment:
`uncaughtExceptions` gets incremented with += 1, which would require
atomic CAS rather than `volatile`.
##
File path:
core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable