This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 02d568c83c3 KAFKA-18854: Move part of
DynamicConfig/DynamicBrokerConfig to server… (#21302)
02d568c83c3 is described below
commit 02d568c83c32b9af425bc23bd506efabb520005f
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Jan 28 18:09:15 2026 +0100
KAFKA-18854: Move part of DynamicConfig/DynamicBrokerConfig to server…
(#21302)
… module
This is moving just enough logic so ConfigCommand does not depend on any
core classes anymore and can be moved to the tools module.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 3 +-
.../main/scala/kafka/network/SocketServer.scala | 48 ++--
.../src/main/scala/kafka/server/ConfigHelper.scala | 6 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 272 +++-----------------
.../main/scala/kafka/server/DynamicConfig.scala | 69 -----
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +-
.../unit/kafka/network/ConnectionQuotasTest.scala | 12 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 28 +-
.../org/apache/kafka/network/SocketServer.java | 42 +++
.../kafka/server/config/DynamicBrokerConfig.java | 281 +++++++++++++++++++++
.../apache/kafka/server/config/DynamicConfig.java | 70 +++++
.../server/config/DynamicBrokerConfigTest.java | 58 +++++
.../kafka/tools/ConfigCommandIntegrationTest.java | 2 +-
13 files changed, 526 insertions(+), 371 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index e731b566f9e..cea7d774525 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,7 +18,6 @@
package kafka.admin
import joptsimple._
-import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions,
ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion,
ScramMechanism => PublicScramMechanism}
@@ -30,7 +29,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity,
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.group.GroupConfig
-import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
+import org.apache.kafka.server.config.{ConfigType, DynamicConfig, QuotaConfig}
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index b9de13d637f..ebc0f990ce1 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -27,7 +27,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse,
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
-import kafka.network.SocketServer._
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
@@ -43,7 +42,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest,
RequestContext, Req
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName,
Reconfigurable}
-import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
+import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledException, SocketServer => JSocketServer,
SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
@@ -92,8 +91,8 @@ class SocketServer(
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
- private val memoryPoolDepletedPercentMetricName =
metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
- private val memoryPoolDepletedTimeMetricName =
metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
+ private val memoryPoolDepletedPercentMetricName =
metrics.metricName("MemoryPoolAvgDepletedPercent", JSocketServer.METRICS_GROUP)
+ private val memoryPoolDepletedTimeMetricName =
metrics.metricName("MemoryPoolDepletedTimeTotal", JSocketServer.METRICS_GROUP)
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS,
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false,
memoryPoolSensor) else MemoryPool.NONE
// data-plane
@@ -117,7 +116,7 @@ class SocketServer(
metricsGroup.newGauge(s"NetworkProcessorAvgIdlePercent", () =>
SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
- metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
+ metrics.metricName("io-wait-ratio", JSocketServer.METRICS_GROUP,
p.metricTags)
}
if (dataPlaneProcessors.isEmpty) {
1.0
@@ -133,7 +132,7 @@ class SocketServer(
metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () =>
SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p
=>
- metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
+ metrics.metricName("expired-connections-killed-count",
JSocketServer.METRICS_GROUP, p.metricTags)
}
expiredConnectionsKilledCountMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m =>
m.metricValue.asInstanceOf[Double])
@@ -311,7 +310,7 @@ class SocketServer(
}
}
- override def reconfigurableConfigs: Set[String] =
SocketServer.ReconfigurableConfigs
+ override def reconfigurableConfigs: util.Set[String] =
JSocketServer.RECONFIGURABLE_CONFIGS
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -354,23 +353,6 @@ class SocketServer(
}
}
-object SocketServer {
- val MetricsGroup = "socket-server-metrics"
-
- val ReconfigurableConfigs: Set[String] = Set(
- SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
- SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
- SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
- SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
-
- val ListenerReconfigurableConfigs: Set[String] =
Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
-
- def closeSocket(channel: SocketChannel): Unit = {
- Utils.closeQuietly(channel.socket, "channel socket")
- Utils.closeQuietly(channel, "channel")
- }
-}
-
object DataPlaneAcceptor {
val ListenerReconfigurableConfigs: Set[String] =
Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
}
@@ -613,7 +595,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
// The serverChannel will be null if Acceptor's thread is not started
Utils.closeQuietly(serverChannel, "Acceptor serverChannel")
Utils.closeQuietly(nioSelector, "Acceptor nioSelector")
- throttledSockets.foreach(throttledSocket =>
closeSocket(throttledSocket.socket))
+ throttledSockets.foreach(throttledSocket =>
JSocketServer.closeSocket(throttledSocket.socket))
throttledSockets.clear()
}
@@ -720,7 +702,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
while (throttledSockets.headOption.exists(_.endThrottleTimeMs < timeMs)) {
val closingSocket = throttledSockets.dequeue()
debug(s"Closing socket from ip ${closingSocket.socket.getRemoteAddress}")
- closeSocket(closingSocket.socket)
+ JSocketServer.closeSocket(closingSocket.socket)
}
}
@@ -852,7 +834,7 @@ private[kafka] class Processor(
).asJava
metricsGroup.newGauge(IdlePercentMetricName, () => {
- Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup,
metricTags))).fold(0.0)(m =>
+ Option(metrics.metric(metrics.metricName("io-wait-ratio",
JSocketServer.METRICS_GROUP, metricTags))).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
},
// for compatibility, only add a networkProcessor tag to the Yammer
Metrics alias (the equivalent Selector metric
@@ -861,7 +843,7 @@ private[kafka] class Processor(
)
private val expiredConnectionsKilledCount = new CumulativeSum()
- private val expiredConnectionsKilledCountMetricName =
metrics.metricName("expired-connections-killed-count", MetricsGroup, metricTags)
+ private val expiredConnectionsKilledCountMetricName =
metrics.metricName("expired-connections-killed-count",
JSocketServer.METRICS_GROUP, metricTags)
metrics.addMetric(expiredConnectionsKilledCountMetricName,
expiredConnectionsKilledCount)
private[network] val selector = createSelector(
@@ -1352,7 +1334,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate:
Option[Int]): Unit = synchronized {
def isIpConnectionRateMetric(metricName: MetricName) = {
metricName.name == ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME &&
- metricName.group == MetricsGroup &&
+ metricName.group == JSocketServer.METRICS_GROUP &&
metricName.tags.containsKey(ConnectionQuotaEntity.IP_METRIC_TAG)
}
@@ -1620,7 +1602,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
private def connectionRateMetricName(connectionQuotaEntity:
ConnectionQuotaEntity): MetricName = {
metrics.metricName(
connectionQuotaEntity.metricName,
- MetricsGroup,
+ JSocketServer.METRICS_GROUP,
s"Tracking rate of accepting new connections (per second)",
connectionQuotaEntity.metricTags)
}
@@ -1653,7 +1635,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
}
override def reconfigurableConfigs(): util.Set[String] = {
- SocketServer.ListenerReconfigurableConfigs.asJava
+ JSocketServer.LISTENER_RECONFIGURABLE_CONFIGS
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit =
{
@@ -1696,7 +1678,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
private def createConnectionRateThrottleSensor(throttlePrefix: String):
Sensor = {
val sensor =
metrics.sensor(s"${throttlePrefix}ConnectionRateThrottleTime-${listener.value}")
val metricName =
metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
- MetricsGroup,
+ JSocketServer.METRICS_GROUP,
"Tracking average throttle-time, out of non-zero throttle times, per
listener",
Map(ListenerMetricTag -> listener.value).asJava)
sensor.add(metricName, new Avg)
@@ -1711,7 +1693,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
if (channel != null) {
log.debug(s"Closing connection from
${channel.socket.getRemoteSocketAddress}")
dec(listenerName, channel.socket.getInetAddress)
- closeSocket(channel)
+ JSocketServer.closeSocket(channel)
}
}
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 743937b54fc..5c32c14eb2d 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -35,7 +35,7 @@ import
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig
-import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.config.{DynamicBrokerConfig,
ServerTopicConfigSynonyms}
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig
@@ -246,7 +246,7 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
.filter(perBrokerConfig || _.source ==
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else
allSynonyms.head.source
- val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
+ val readOnly = !DynamicBrokerConfig.ALL_DYNAMIC_CONFIGS.contains(name)
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation)
brokerDocumentation(name) else null
@@ -274,7 +274,7 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
}
private def brokerSynonyms(name: String): List[String] = {
- DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride =
true)
+ DynamicBrokerConfig.brokerConfigSynonyms(name, true).asScala.toList
}
private def brokerDocumentation(name: String): String = {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 27156831b25..a467f18ace7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,14 +22,13 @@ import java.util.{Collections, Properties}
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.log.LogManager
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.DataPlaneAcceptor
import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
import kafka.utils.Logging
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.Endpoint
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, ConfigResource, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, SslConfigs}
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@@ -37,20 +36,17 @@ import
org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.utils.{BufferSupplier, ConfigUtils, Utils}
import org.apache.kafka.config
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.network.SocketServer
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig,
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{DynamicConfig,
DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs,
DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry,
ClientTelemetryExporterProvider}
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.snapshot.RecordsSnapshotReader
-import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}
+import org.apache.kafka.storage.internals.log.LogConfig
import scala.util.Using
import scala.collection._
@@ -90,117 +86,8 @@ import scala.jdk.CollectionConverters._
*/
object DynamicBrokerConfig {
- private[server] val DynamicSecurityConfigs =
SslConfigs.RECONFIGURABLE_CONFIGS.asScala
- private[server] val DynamicProducerStateManagerConfig =
Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
-
- val AllDynamicConfigs = DynamicSecurityConfigs ++
- LogCleaner.RECONFIGURABLE_CONFIGS.asScala ++
- DynamicLogConfig.ReconfigurableConfigs ++
- DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala ++
- Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++
- DynamicListenerConfig.ReconfigurableConfigs ++
- SocketServer.ReconfigurableConfigs ++
- DynamicProducerStateManagerConfig ++
- DynamicRemoteLogConfig.ReconfigurableConfigs ++
- DynamicReplicationConfig.ReconfigurableConfigs ++
- Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
- GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
- ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
-
- private val ClusterLevelListenerConfigs =
Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
- private val PerBrokerConfigs = (DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs).diff(
- ClusterLevelListenerConfigs)
- private val ListenerMechanismConfigs = Set(SaslConfigs.SASL_JAAS_CONFIG,
- SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
- SaslConfigs.SASL_LOGIN_CLASS,
- BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
- BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG)
-
private val ReloadableFileConfigs =
Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
- private val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
-
- def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean):
List[String] = {
- name match {
- case ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG |
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG =>
- List(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG)
- case ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG |
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG =>
- List(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG)
- case ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG => //
KafkaLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG is used as default
- List(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG,
ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG)
- case ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG |
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG |
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG =>
- List(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG,
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG)
- case ListenerConfigRegex(baseName) if matchListenerOverride =>
- // `ListenerMechanismConfigs` are specified as
listenerPrefix.mechanism.<configName>
- // and other listener configs are specified as
listenerPrefix.<configName>
- // Add <configName> as a synonym in both cases.
- val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith)
- List(name, mechanismConfig.getOrElse(baseName))
- case _ => List(name)
- }
- }
-
- def validateConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
- def checkInvalidProps(invalidPropNames: Set[String], errorMessage:
String): Unit = {
- if (invalidPropNames.nonEmpty)
- throw new ConfigException(s"$errorMessage: $invalidPropNames")
- }
- checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs
dynamically")
- checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
- "These security configs can be dynamically updated only per-listener
using the listener prefix")
- validateConfigTypes(props)
- if (!perBrokerConfig) {
- checkInvalidProps(perBrokerConfigs(props),
- "Cannot update these configs at default cluster level, broker id must
be specified")
- }
- }
-
- private def perBrokerConfigs(props: Properties): Set[String] = {
- val configNames = props.asScala.keySet
- def perBrokerListenerConfig(name: String): Boolean = {
- name match {
- case ListenerConfigRegex(baseName) =>
!ClusterLevelListenerConfigs.contains(baseName)
- case _ => false
- }
- }
- configNames.intersect(PerBrokerConfigs) ++
configNames.filter(perBrokerListenerConfig)
- }
-
- private def nonDynamicConfigs(props: Properties): Set[String] = {
- props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
- }
-
- private def securityConfigsWithoutListenerPrefix(props: Properties):
Set[String] = {
- DynamicSecurityConfigs.filter(props.containsKey)
- }
-
- private def validateConfigTypes(props: Properties): Unit = {
- val baseProps = new Properties
- props.asScala.foreach {
- case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
- case (k, v) => baseProps.put(k, v)
- }
- DynamicConfig.Broker.validate(baseProps)
- }
-
- private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
- AllDynamicConfigs.map { name =>
- val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else
"cluster-wide"
- name -> mode
- }.toMap.asJava
- }
-
- private[server] def resolveVariableConfigs(propsOriginal: Properties):
Properties = {
- val props = new Properties
- val config = new AbstractConfig(new ConfigDef(), propsOriginal,
Utils.castToStringObjectMap(propsOriginal), false)
- config.originals.forEach { (key, value) =>
- if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
- props.put(key, value)
- }
- }
- props
- }
-
private[server] def readDynamicBrokerConfigsFromSnapshot(
raftManager: KafkaRaftManager[ApiMessageAndVersion],
config: KafkaConfig,
@@ -233,7 +120,7 @@ object DynamicBrokerConfig {
batch.forEach { record =>
if (record.message().apiKey() ==
MetadataRecordType.CONFIG_RECORD.id) {
val configRecord = record.message().asInstanceOf[ConfigRecord]
- if
(DynamicBrokerConfig.AllDynamicConfigs.contains(configRecord.name()) &&
+ if
(JDynamicBrokerConfig.ALL_DYNAMIC_CONFIGS.contains(configRecord.name()) &&
configRecord.resourceType() ==
ConfigResource.Type.BROKER.id()) {
if (configRecord.resourceName().isEmpty) {
putOrRemoveIfNull(dynamicDefaultConfigs,
configRecord.name(), configRecord.value())
@@ -339,14 +226,14 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
- verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
+ verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
reconfigurables.add(reconfigurable)
}
def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable):
Unit = {
- verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
+ verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables.add(new BrokerReconfigurable {
- override def reconfigurableConfigs: Set[String] =
reconfigurable.reconfigurableConfigs().asScala
+ override def reconfigurableConfigs: util.Set[String] =
reconfigurable.reconfigurableConfigs
override def validateReconfiguration(newConfig: KafkaConfig): Unit =
reconfigurable.validateReconfiguration(newConfig)
@@ -363,8 +250,9 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
reconfigurables.remove(reconfigurable)
}
- private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = {
- val nonDynamic =
configNames.intersect(DynamicConfig.Broker.nonDynamicProps)
+ private def verifyReconfigurableConfigs(configNames: util.Set[String]): Unit
= {
+ val nonDynamic = new util.HashSet(configNames)
+ nonDynamic.retainAll(DynamicConfig.Broker.nonDynamicProps)
require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs
$nonDynamic")
}
@@ -436,17 +324,17 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
// Remove all invalid configs from `props`
removeInvalidConfigs(props, perBrokerConfig)
- def removeInvalidProps(invalidPropNames: Set[String], errorMessage:
String): Unit = {
- if (invalidPropNames.nonEmpty) {
- invalidPropNames.foreach(props.remove)
+ def removeInvalidProps(invalidPropNames: util.Set[String], errorMessage:
String): Unit = {
+ if (!invalidPropNames.isEmpty) {
+ invalidPropNames.forEach(name => props.remove(name))
error(s"$errorMessage: $invalidPropNames")
}
}
- removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be
ignored")
- removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
+ removeInvalidProps(JDynamicBrokerConfig.nonDynamicConfigs(props),
"Non-dynamic configs will be ignored")
+
removeInvalidProps(JDynamicBrokerConfig.securityConfigsWithoutListenerPrefix(props),
"Security configs can be dynamically updated only using listener prefix,
base configs will be ignored")
if (!perBrokerConfig)
- removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined
at default cluster level will be ignored")
+ removeInvalidProps(JDynamicBrokerConfig.perBrokerConfigs(props),
"Per-broker configs defined at default cluster level will be ignored")
props
}
@@ -458,8 +346,8 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
* Note: The caller must acquire the read or write lock before invoking this
method.
*/
private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig:
Boolean): Map[String, String] = {
- val propsResolved =
DynamicBrokerConfig.resolveVariableConfigs(propsOverride)
- validateConfigs(propsResolved, perBrokerConfig)
+ val propsResolved =
JDynamicBrokerConfig.resolveVariableConfigs(propsOverride)
+ JDynamicBrokerConfig.validateConfigs(propsResolved, perBrokerConfig)
val newProps = mutable.Map[String, String]()
newProps ++= staticBrokerConfigs
if (perBrokerConfig) {
@@ -479,7 +367,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
private def removeInvalidConfigs(props: Properties, perBrokerConfig:
Boolean): Unit = {
try {
- validateConfigTypes(props)
+ JDynamicBrokerConfig.validateConfigTypes(props)
props.asScala
} catch {
case e: Exception =>
@@ -487,7 +375,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
val props1 = new Properties
props1.put(k, v)
try {
- validateConfigTypes(props1)
+ JDynamicBrokerConfig.validateConfigTypes(props1)
false
} catch {
case _: Exception => true
@@ -532,7 +420,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
// so that base configs corresponding to listener configs are not
removed. Base configs should not be removed
// since they may be used by other listeners. It is ok to retain them in
`props` since base configs cannot be
// dynamically updated and listener-specific configs have the higher
precedence.
- brokerConfigSynonyms(k, matchListenerOverride =
false).foreach(props.remove)
+ JDynamicBrokerConfig.brokerConfigSynonyms(k, false).forEach(props.remove)
props.put(k, v)
}
}
@@ -572,7 +460,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
// BrokerReconfigurable updates are processed after config is updated.
Only do the validation here.
val brokerReconfigurablesToUpdate =
mutable.Buffer[BrokerReconfigurable]()
brokerReconfigurables.forEach { reconfigurable =>
- if
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava,
changeMap.keySet, deletedKeySet)) {
+ if (needsReconfiguration(reconfigurable.reconfigurableConfigs,
changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
brokerReconfigurablesToUpdate += reconfigurable
@@ -641,27 +529,17 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
*/
trait BrokerReconfigurable {
- def reconfigurableConfigs: Set[String]
+ def reconfigurableConfigs: util.Set[String]
def validateReconfiguration(newConfig: KafkaConfig): Unit
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}
-object DynamicLogConfig {
- /**
- * The broker configurations pertaining to logs that are reconfigurable.
This set contains
- * the names you would use when setting a static or dynamic broker
configuration (not topic
- * configuration).
- */
- val ReconfigurableConfigs: Set[String] =
- ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.values.toSet
-}
-
class DynamicLogConfig(logManager: LogManager) extends BrokerReconfigurable
with Logging {
- override def reconfigurableConfigs: Set[String] = {
- DynamicLogConfig.ReconfigurableConfigs
+ override def reconfigurableConfigs: util.Set[String] = {
+ JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -728,8 +606,8 @@ class DynamicLogConfig(logManager: LogManager) extends
BrokerReconfigurable with
class ControllerDynamicThreadPool(controller: ControllerServer) extends
BrokerReconfigurable {
- override def reconfigurableConfigs: Set[String] = {
- Set(ServerConfigs.NUM_IO_THREADS_CONFIG)
+ override def reconfigurableConfigs: util.Set[String] = {
+ util.Set.of(ServerConfigs.NUM_IO_THREADS_CONFIG)
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -744,8 +622,8 @@ class ControllerDynamicThreadPool(controller:
ControllerServer) extends BrokerRe
class BrokerDynamicThreadPool(server: KafkaBroker) extends
BrokerReconfigurable {
- override def reconfigurableConfigs: Set[String] = {
- DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala
+ override def reconfigurableConfigs: util.Set[String] = {
+ DynamicThreadPool.RECONFIGURABLE_CONFIGS
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -880,60 +758,6 @@ class DynamicMetricReporterState(brokerId: Int, config:
KafkaConfig, metrics: Me
}
}
-object DynamicListenerConfig {
- /**
- * The set of configurations which the DynamicListenerConfig object listens
for. Many of
- * these are also monitored by other objects such as ChannelBuilders and
SocketServers.
- */
- val ReconfigurableConfigs = Set(
- // Listener configs
- SocketServerConfigs.LISTENERS_CONFIG,
- SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
-
- // SSL configs
- BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
- SslConfigs.SSL_PROTOCOL_CONFIG,
- SslConfigs.SSL_PROVIDER_CONFIG,
- SslConfigs.SSL_CIPHER_SUITES_CONFIG,
- SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
- SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
- SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
- SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
- SslConfigs.SSL_KEY_PASSWORD_CONFIG,
- SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
- SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
- SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
- SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
- SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
- SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
- SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
- BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
- SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG,
-
- // SASL configs
- BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
- SaslConfigs.SASL_JAAS_CONFIG,
- BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
- SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
- SaslConfigs.SASL_KERBEROS_KINIT_CMD,
- SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
- SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER,
- SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
- BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
- SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR,
- SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER,
- SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
- SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
-
- // Connection limit configs
- SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
- SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
-
- // Network threads
- SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG
- )
-}
-
class DynamicClientQuotaCallback(
quotaManagers: QuotaFactory.QuotaManagers,
serverConfig: KafkaConfig
@@ -974,8 +798,8 @@ class DynamicClientQuotaCallback(
class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable
with Logging {
- override def reconfigurableConfigs: Set[String] = {
- DynamicListenerConfig.ReconfigurableConfigs
+ override def reconfigurableConfigs: util.Set[String] = {
+ JDynamicBrokerConfig.DynamicListenerConfig.RECONFIGURABLE_CONFIGS
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -991,7 +815,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String):
Map[String, AnyRef] = {
kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case
(key, _) =>
// skip the reconfigurable configs
- !DynamicSecurityConfigs.contains(key) &&
!SocketServer.ListenerReconfigurableConfigs.contains(key) &&
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)
+ !JDynamicBrokerConfig.DYNAMIC_SECURITY_CONFIGS.contains(key) &&
!SocketServer.LISTENER_RECONFIGURABLE_CONFIGS.contains(key) &&
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)
}
}
if (immutableListenerConfigs(newConfig, listenerName.configPrefix) !=
immutableListenerConfigs(oldConfig, listenerName.configPrefix))
@@ -1022,8 +846,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
}
class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable
with Logging {
- override def reconfigurableConfigs: Set[String] = {
- DynamicRemoteLogConfig.ReconfigurableConfigs
+ override def reconfigurableConfigs: util.Set[String] = {
+ JDynamicBrokerConfig.DynamicRemoteLogConfig.RECONFIGURABLE_CONFIGS
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -1124,23 +948,9 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends
BrokerReconfigurable w
}
}
-object DynamicRemoteLogConfig {
- val ReconfigurableConfigs = Set(
- RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
- RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
- RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
- RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
- )
-}
-
class DynamicReplicationConfig(server: KafkaBroker) extends
BrokerReconfigurable with Logging {
- override def reconfigurableConfigs: Set[String] = {
- DynamicReplicationConfig.ReconfigurableConfigs
+ override def reconfigurableConfigs: util.Set[String] = {
+ JDynamicBrokerConfig.DynamicReplicationConfig.RECONFIGURABLE_CONFIGS
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
@@ -1151,9 +961,3 @@ class DynamicReplicationConfig(server: KafkaBroker)
extends BrokerReconfigurable
// Currently it is a noop for reconfiguring the dynamic config
follower.fetch.last.tiered.offset.enable
}
}
-
-object DynamicReplicationConfig {
- val ReconfigurableConfigs = Set(
- ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG
- )
-}
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala
b/core/src/main/scala/kafka/server/DynamicConfig.scala
deleted file mode 100644
index ad48b904c13..00000000000
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.server.DynamicBrokerConfig.AllDynamicConfigs
-
-import java.util.Properties
-import org.apache.kafka.common.config.ConfigDef
-import org.apache.kafka.server.config.QuotaConfig
-
-import java.util
-import scala.jdk.CollectionConverters._
-
-/**
- * Class used to hold dynamic configs. These are configs which have no
physical manifestation in the server.properties
- * and can only be set dynamically.
- */
-object DynamicConfig {
- object Broker {
- private val brokerConfigs = {
- val configs = QuotaConfig.brokerQuotaConfigs()
-
- // Filter and define all dynamic configurations
- KafkaConfig.configKeys
- .filter { case (configName, _) =>
AllDynamicConfigs.contains(configName) }
- .foreach { case (_, config) => configs.define(config) }
- configs
- }
-
- // In order to avoid circular reference, all DynamicBrokerConfig's
variables which are initialized by `DynamicConfig.Broker` should be moved to
`DynamicConfig.Broker`.
- // Otherwise, those variables of DynamicBrokerConfig will see intermediate
state of `DynamicConfig.Broker`, because `brokerConfigs` is created by
`DynamicBrokerConfig.AllDynamicConfigs`
- val nonDynamicProps: Set[String] = KafkaConfig.configNames.toSet --
brokerConfigs.names.asScala
-
- def configKeys: util.Map[String, ConfigDef.ConfigKey] =
brokerConfigs.configKeys
-
- def names: util.Set[String] = brokerConfigs.names
-
- def validate(props: Properties): util.Map[String, AnyRef] =
DynamicConfig.validate(brokerConfigs, props, customPropsAllowed = true)
- }
-
-
- private def validate(configDef: ConfigDef, props: Properties,
customPropsAllowed: Boolean) = {
- // Validate Names
- val names = configDef.names
- val propKeys = props.keySet.asScala.map(_.asInstanceOf[String])
- if (!customPropsAllowed) {
- val unknownKeys = propKeys.filterNot(names.contains(_))
- require(unknownKeys.isEmpty, s"Unknown Dynamic Configuration:
$unknownKeys.")
- }
- val propResolved = DynamicBrokerConfig.resolveVariableConfigs(props)
- // ValidateValues
- configDef.parse(propResolved)
- }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8c3c6094e68..740130cfac2 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
-import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig,
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, DynamicConfig,
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs,
DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
@@ -58,7 +58,7 @@ object KafkaConfig {
def main(args: Array[String]): Unit = {
System.out.println(configDef.toHtml(4, (config: String) =>
"brokerconfigs_" + config,
- DynamicBrokerConfig.dynamicConfigUpdateModes))
+ JDynamicBrokerConfig.dynamicConfigUpdateModes))
}
val configDef = AbstractKafkaConfig.CONFIG_DEF
@@ -95,7 +95,7 @@ object KafkaConfig {
typeOf(configName) match {
case Some(t) => Some(t)
case None =>
- DynamicBrokerConfig.brokerConfigSynonyms(configName,
matchListenerOverride = true).flatMap(typeOf).headOption
+ JDynamicBrokerConfig.brokerConfigSynonyms(configName,
true).asScala.flatMap(typeOf).headOption
}
}
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 2d81c2a773b..0c0d7581e81 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
import org.apache.kafka.common.network._
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.network.{ConnectionThrottledException,
SocketServerConfigs, TooManyConnectionsException}
+import org.apache.kafka.network.{ConnectionThrottledException, SocketServer,
SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
@@ -830,7 +830,7 @@ class ConnectionQuotasTest {
private def listenerConnThrottleMetric(listener: String) : KafkaMetric = {
val metricName = metrics.metricName(
"connection-accept-throttle-time",
- SocketServer.MetricsGroup,
+ SocketServer.METRICS_GROUP,
util.Map.of(Processor.ListenerMetricTag, listener))
metrics.metric(metricName)
}
@@ -838,7 +838,7 @@ class ConnectionQuotasTest {
private def ipConnThrottleMetric(listener: String): KafkaMetric = {
val metricName = metrics.metricName(
"ip-connection-accept-throttle-time",
- SocketServer.MetricsGroup,
+ SocketServer.METRICS_GROUP,
util.Map.of(Processor.ListenerMetricTag, listener))
metrics.metric(metricName)
}
@@ -846,7 +846,7 @@ class ConnectionQuotasTest {
private def listenerConnRateMetric(listener: String) : KafkaMetric = {
val metricName = metrics.metricName(
"connection-accept-rate",
- SocketServer.MetricsGroup,
+ SocketServer.METRICS_GROUP,
util.Map.of(Processor.ListenerMetricTag, listener))
metrics.metric(metricName)
}
@@ -854,14 +854,14 @@ class ConnectionQuotasTest {
private def brokerConnRateMetric() : KafkaMetric = {
val metricName = metrics.metricName(
s"broker-connection-accept-rate",
- SocketServer.MetricsGroup)
+ SocketServer.METRICS_GROUP)
metrics.metric(metricName)
}
private def ipConnRateMetric(ip: String): KafkaMetric = {
val metricName = metrics.metricName(
s"connection-accept-rate",
- SocketServer.MetricsGroup,
+ SocketServer.METRICS_GROUP,
util.Map.of("ip", ip))
metrics.metric(metricName)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 431809129c9..ea69a599a15 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.network.{SocketServer => JSocketServer,
SocketServerConfigs}
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
@@ -326,7 +326,7 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.removeReconfigurable(reconfigurable)
val brokerReconfigurable = new BrokerReconfigurable {
- override def reconfigurableConfigs: collection.Set[String] =
Set(CleanerConfig.LOG_CLEANER_THREADS_PROP)
+ override def reconfigurableConfigs: util.Set[String] =
util.Set.of(CleanerConfig.LOG_CLEANER_THREADS_PROP)
override def validateReconfiguration(newConfig: KafkaConfig): Unit =
validateLogCleanerConfig(newConfig.originals)
override def reconfigure(oldConfig: KafkaConfig, newConfig:
KafkaConfig): Unit = {}
}
@@ -351,7 +351,7 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.addReconfigurable(createReconfigurable(validReconfigurableProps))
def createBrokerReconfigurable(configs: Set[String]) = new
BrokerReconfigurable {
- override def reconfigurableConfigs: collection.Set[String] = configs
+ override def reconfigurableConfigs: util.Set[String] = configs.asJava
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {}
override def reconfigure(oldConfig: KafkaConfig, newConfig:
KafkaConfig): Unit = {}
}
@@ -510,7 +510,7 @@ class DynamicBrokerConfigTest {
when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
when(kafkaServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
-
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+
when(socketServer.reconfigurableConfigs).thenReturn(JSocketServer.RECONFIGURABLE_CONFIGS)
when(kafkaServer.socketServer).thenReturn(socketServer)
val logManager: LogManager = mock(classOf[LogManager])
val producerStateManagerConfig: ProducerStateManagerConfig =
mock(classOf[ProducerStateManagerConfig])
@@ -557,7 +557,7 @@ class DynamicBrokerConfigTest {
when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
when(controllerServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
-
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+
when(socketServer.reconfigurableConfigs).thenReturn(JSocketServer.RECONFIGURABLE_CONFIGS)
when(controllerServer.socketServer).thenReturn(socketServer)
val authorizer = new TestAuthorizer
@@ -603,7 +603,7 @@ class DynamicBrokerConfigTest {
when(quotaManagers.clientQuotaCallbackPlugin).thenReturn(Optional.empty())
when(controllerServer.quotaManagers).thenReturn(quotaManagers)
val socketServer: SocketServer = mock(classOf[SocketServer])
-
when(socketServer.reconfigurableConfigs).thenReturn(SocketServer.ReconfigurableConfigs)
+
when(socketServer.reconfigurableConfigs).thenReturn(JSocketServer.RECONFIGURABLE_CONFIGS)
when(controllerServer.socketServer).thenReturn(socketServer)
val authorizer = new TestAuthorizer
@@ -616,18 +616,6 @@ class DynamicBrokerConfigTest {
assertEquals("User:admin", authorizer.superUsers)
}
- @Test
- def testSynonyms(): Unit = {
- assertEquals(List("listener.name.secure.ssl.keystore.type",
"ssl.keystore.type"),
-
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type",
matchListenerOverride = true))
- assertEquals(List("listener.name.sasl_ssl.plain.sasl.jaas.config",
"sasl.jaas.config"),
-
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config",
matchListenerOverride = true))
- assertEquals(List("some.config"),
- DynamicBrokerConfig.brokerConfigSynonyms("some.config",
matchListenerOverride = true))
- assertEquals(List(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG),
-
DynamicBrokerConfig.brokerConfigSynonyms(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
matchListenerOverride = true))
- }
-
@Test
def testImproperConfigsAreRemoved(): Unit = {
val props = TestUtils.createBrokerConfig(0)
@@ -1153,8 +1141,8 @@ class DynamicBrokerConfigTest {
class TestDynamicThreadPool extends BrokerReconfigurable {
- override def reconfigurableConfigs: Set[String] = {
- DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala
+ override def reconfigurableConfigs: util.Set[String] = {
+ DynamicThreadPool.RECONFIGURABLE_CONFIGS
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
diff --git a/server/src/main/java/org/apache/kafka/network/SocketServer.java
b/server/src/main/java/org/apache/kafka/network/SocketServer.java
new file mode 100644
index 00000000000..582ba5c3cf6
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/SocketServer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+public class SocketServer {
+
+ public static final String METRICS_GROUP = "socket-server-metrics";
+
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+ SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
+ SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
+ SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+ SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG);
+
+ public static final Set<String> LISTENER_RECONFIGURABLE_CONFIGS = Set.of(
+ SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+ SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG);
+
+ public static void closeSocket(SocketChannel channel) {
+ Utils.closeQuietly(channel.socket(), "channel socket");
+ Utils.closeQuietly(channel, "channel");
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
new file mode 100644
index 00000000000..a45c65a9b9b
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.network.SocketServer;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.server.DynamicThreadPool;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.storage.internals.log.LogCleaner;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DynamicBrokerConfig {
+
+ public static final Set<String> DYNAMIC_SECURITY_CONFIGS =
SslConfigs.RECONFIGURABLE_CONFIGS;
+
+ private static final Set<String> DYNAMIC_PRODUCER_STATE_MANAGER_CONFIGS =
Set.of(
+ TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
+
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+
+ private static final Set<String> CLUSTER_LEVEL_LISTENER_CONFIGS = Set.of(
+ SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+ SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
+ SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);
+
+ private static final Set<String> PER_BROKER_CONFIGS = Stream.of(
+ DYNAMIC_SECURITY_CONFIGS,
+ DynamicListenerConfig.RECONFIGURABLE_CONFIGS)
+ .flatMap(Collection::stream)
+ .filter(c -> !CLUSTER_LEVEL_LISTENER_CONFIGS.contains(c))
+ .collect(Collectors.toUnmodifiableSet());
+
+ public static final Set<String> ALL_DYNAMIC_CONFIGS = Stream.of(
+ DYNAMIC_SECURITY_CONFIGS,
+ LogCleaner.RECONFIGURABLE_CONFIGS,
+ DynamicLogConfig.RECONFIGURABLE_CONFIGS,
+ DynamicThreadPool.RECONFIGURABLE_CONFIGS,
+ List.of(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG),
+ DynamicListenerConfig.RECONFIGURABLE_CONFIGS,
+ SocketServer.RECONFIGURABLE_CONFIGS,
+ DYNAMIC_PRODUCER_STATE_MANAGER_CONFIGS,
+ DynamicRemoteLogConfig.RECONFIGURABLE_CONFIGS,
+ DynamicReplicationConfig.RECONFIGURABLE_CONFIGS,
+ List.of(AbstractConfig.CONFIG_PROVIDERS_CONFIG),
+ GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS,
+ ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toUnmodifiableSet());
+
+ private static final Set<String> LISTENER_MECHANISM_CONFIGS = Set.of(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ SaslConfigs.SASL_LOGIN_CLASS,
+ BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
+ BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG);
+
+ private static final Pattern LISTENER_CONFIG_REGEX =
Pattern.compile("listener\\.name\\.[^.]*\\.(.*)");
+
+ public static List<String> brokerConfigSynonyms(String name, boolean
matchListenerOverride) {
+ List<String> logRollConfigs =
List.of(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG);
+ List<String> logRollJitterConfigs =
List.of(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG);
+ List<String> logRetentionConfigs =
List.of(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG,
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG);
+ List<String> logFlushConfigs =
List.of(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG,
ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+ if (logRollConfigs.contains(name)) {
+ return logRollConfigs;
+ } else if (logRollJitterConfigs.contains(name)) {
+ return logRollJitterConfigs;
+ } else if (ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG.equals(name))
{ // KafkaLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG is used as default
+ return logFlushConfigs;
+ } else if (logRetentionConfigs.contains(name)) {
+ return logRetentionConfigs;
+ } else if (matchListenerOverride) {
+ Matcher matcher = LISTENER_CONFIG_REGEX.matcher(name);
+ if (matcher.matches()) {
+ String baseName = matcher.group(1);
+ // `ListenerMechanismConfigs` are specified as
listenerPrefix.mechanism.<configName>
+ // and other listener configs are specified as
listenerPrefix.<configName>
+ // Add <configName> as a synonym in both cases.
+ Optional<String> mechanismConfig =
LISTENER_MECHANISM_CONFIGS.stream().filter(baseName::endsWith).findFirst();
+ return List.of(name, mechanismConfig.orElse(baseName));
+ }
+ }
+ return List.of(name);
+ }
+
+ private static void checkInvalidProps(Set<String> invalidPropNames, String
errorMessage) {
+ if (!invalidPropNames.isEmpty()) {
+ throw new ConfigException(errorMessage + ": " + invalidPropNames);
+ }
+ }
+
+ public static void validateConfigs(Properties props, boolean
perBrokerConfig) {
+ checkInvalidProps(nonDynamicConfigs(props), "Cannot update these
configs dynamically");
+ checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
+ "These security configs can be dynamically updated only
per-listener using the listener prefix");
+ validateConfigTypes(props);
+ if (!perBrokerConfig) {
+ checkInvalidProps(perBrokerConfigs(props),
+ "Cannot update these configs at default cluster level,
broker id must be specified");
+ }
+ }
+
+ public static Set<String> securityConfigsWithoutListenerPrefix(Properties
props) {
+ return
DYNAMIC_SECURITY_CONFIGS.stream().filter(props::containsKey).collect(Collectors.toSet());
+ }
+
+ public static void validateConfigTypes(Properties props) {
+ Properties baseProps = new Properties();
+ props.forEach((name, value) -> {
+ Matcher matcher = LISTENER_CONFIG_REGEX.matcher((String) name);
+ if (matcher.matches()) {
+ String baseName = matcher.group(1);
+ baseProps.put(baseName, value);
+ } else {
+ baseProps.put(name, value);
+ }
+ });
+ DynamicConfig.Broker.validate(baseProps);
+ }
+
+ public static Set<String> perBrokerConfigs(Properties props) {
+ Set<String> configNames = props.stringPropertyNames();
+ Set<String> perBrokerConfigs = new HashSet<>();
+ for (String name : configNames) {
+ if (PER_BROKER_CONFIGS.contains(name)) {
+ perBrokerConfigs.add(name);
+ } else {
+ Matcher matcher = LISTENER_CONFIG_REGEX.matcher(name);
+ if (matcher.matches()) {
+ String baseName = matcher.group(1);
+ if (!CLUSTER_LEVEL_LISTENER_CONFIGS.contains(baseName)) {
+ perBrokerConfigs.add(name);
+ }
+ }
+ }
+ }
+ return perBrokerConfigs;
+ }
+
+ public static Set<String> nonDynamicConfigs(Properties props) {
+ Set<String> nonDynamicConfigs = new
HashSet<>(props.stringPropertyNames());
+ nonDynamicConfigs.retainAll(DynamicConfig.Broker.nonDynamicProps());
+ return nonDynamicConfigs;
+ }
+
+ public static Properties resolveVariableConfigs(Properties propsOriginal) {
+ Properties props = new Properties();
+ AbstractConfig config = new AbstractConfig(new ConfigDef(),
propsOriginal, Utils.castToStringObjectMap(propsOriginal), false);
+ config.originals().forEach((key, value) -> {
+ if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
+ props.put(key, value);
+ }
+ });
+ return props;
+ }
+
+ public static Map<String, String> dynamicConfigUpdateModes() {
+ return ALL_DYNAMIC_CONFIGS.stream().collect(Collectors.toMap(
+ Function.identity(),
+ name -> PER_BROKER_CONFIGS.contains(name) ? "per-broker" :
"cluster-wide"
+ )
+ );
+ }
+
+ public static class DynamicLogConfig {
+ /**
+ * The broker configurations pertaining to logs that are
reconfigurable. This set contains
+ * the names you would use when setting a static or dynamic broker
configuration (not topic
+ * configuration).
+ */
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.copyOf(
+ ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values());
+ }
+
+ public static class DynamicListenerConfig {
+ /**
+ * The set of configurations which the DynamicListenerConfig object
listens for. Many of
+ * these are also monitored by other objects such as ChannelBuilders
and SocketServers.
+ */
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+ // Listener configs
+ SocketServerConfigs.LISTENERS_CONFIG,
+ SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+
+ // SSL configs
+ BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+ SslConfigs.SSL_PROTOCOL_CONFIG,
+ SslConfigs.SSL_PROVIDER_CONFIG,
+ SslConfigs.SSL_CIPHER_SUITES_CONFIG,
+ SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+ SslConfigs.SSL_KEY_PASSWORD_CONFIG,
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
+ SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
+ SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
+ SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
+ BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
+ SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG,
+
+ // SASL configs
+
BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
+ SaslConfigs.SASL_JAAS_CONFIG,
+ BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
+ SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
+ SaslConfigs.SASL_KERBEROS_KINIT_CMD,
+ SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
+ SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER,
+ SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
+
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
+ SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR,
+ SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER,
+ SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
+ SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
+
+ // Connection limit configs
+ SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
+ SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
+
+ // Network threads
+ SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);
+ }
+
+ public static class DynamicRemoteLogConfig {
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+ RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FOLLOWER_THREAD_POOL_SIZE_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP);
+ }
+
+ public static class DynamicReplicationConfig {
+ public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
+
ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG);
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java
b/server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java
new file mode 100644
index 00000000000..e1e7ccfd245
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Class used to hold dynamic configs. These are configs which have no
physical manifestation in the server.properties
+ * and can only be set dynamically.
+ */
+public class DynamicConfig {
+
+ public static class Broker {
+
+ private static final ConfigDef BROKER_CONFIGS;
+ static {
+ ConfigDef configs = QuotaConfig.brokerQuotaConfigs();
+ // Filter and define all dynamic configurations
+ AbstractKafkaConfig.CONFIG_DEF.configKeys().forEach((name, value)
-> {
+ if (DynamicBrokerConfig.ALL_DYNAMIC_CONFIGS.contains(name)) {
+ configs.define(value);
+ }
+ });
+ BROKER_CONFIGS = configs;
+ }
+
+ // In order to avoid circular reference, all DynamicBrokerConfig's
variables which are initialized by `DynamicConfig.Broker` should be moved to
`DynamicConfig.Broker`.
+ // Otherwise, those variables of DynamicBrokerConfig will see
intermediate state of `DynamicConfig.Broker`, because `BROKER_CONFIGS` is
created by `DynamicBrokerConfig.ALL_DYNAMIC_CONFIGS`
+ public static Set<String> nonDynamicProps() {
+ Set<String> nonDynamicProps = new
HashSet<>(AbstractKafkaConfig.CONFIG_DEF.names());
+ nonDynamicProps.removeAll(BROKER_CONFIGS.names());
+ return nonDynamicProps;
+ }
+
+ public static Map<String, ConfigDef.ConfigKey> configKeys() {
+ return BROKER_CONFIGS.configKeys();
+ }
+
+ public static Set<String> names() {
+ return BROKER_CONFIGS.names();
+ }
+
+ public static Map<String, Object> validate(Properties props) {
+ // Validate Names
+ Properties propResolved =
DynamicBrokerConfig.resolveVariableConfigs(props);
+ // ValidateValues
+ return BROKER_CONFIGS.parse(propResolved);
+ }
+ }
+
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/config/DynamicBrokerConfigTest.java
b/server/src/test/java/org/apache/kafka/server/config/DynamicBrokerConfigTest.java
new file mode 100644
index 00000000000..1306185f7a9
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/config/DynamicBrokerConfigTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicBrokerConfigTest {
+
+ @Test
+ public void testBrokerConfigSynonyms() {
+ List<String> logRollTimeConfigs =
List.of(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG);
+ for (String config : logRollTimeConfigs) {
+ assertEquals(logRollTimeConfigs,
DynamicBrokerConfig.brokerConfigSynonyms(config, false));
+ }
+ List<String> logRollJitterConfigs =
List.of(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG,
ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG);
+ for (String config : logRollJitterConfigs) {
+ assertEquals(logRollJitterConfigs,
DynamicBrokerConfig.brokerConfigSynonyms(config, false));
+ }
+ List<String> logFlushConfigs =
List.of(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG,
ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+ assertEquals(logFlushConfigs,
DynamicBrokerConfig.brokerConfigSynonyms(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG,
false));
+ List<String> logRetentionConfigs =
List.of(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG,
ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG,
ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG);
+ for (String config : logRetentionConfigs) {
+ assertEquals(logRetentionConfigs,
DynamicBrokerConfig.brokerConfigSynonyms(config, false));
+ }
+
+ assertEquals(List.of("listener.name.secure.ssl.keystore.type",
"ssl.keystore.type"),
+
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type",
true));
+ assertEquals(List.of("listener.name.sasl_ssl.plain.sasl.jaas.config",
"sasl.jaas.config"),
+
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config",
true));
+ assertEquals(List.of("some.config"),
+ DynamicBrokerConfig.brokerConfigSynonyms("some.config", true));
+
+ assertEquals(List.of("listener.name.NAME.CONFIG", "CONFIG"),
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.NAME.CONFIG", true));
+ assertEquals(List.of("listener.name.NAME.CONFIG"),
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.NAME.CONFIG", false));
+ assertEquals(List.of("listener.name.CONFIG"),
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.CONFIG", true));
+ assertEquals(List.of("listener.name.CONFIG"),
DynamicBrokerConfig.brokerConfigSynonyms("listener.name.CONFIG", false));
+
+ assertEquals(List.of("anything"),
DynamicBrokerConfig.brokerConfigSynonyms("anything", false));
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 9800d9125eb..c221ba05ab1 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -101,7 +101,7 @@ public class ConfigCommandIntegrationTest {
"--entity-type", "brokers",
"--alter",
"--add-config", "security.inter.broker.protocol=PLAINTEXT")),
- errOut -> assertTrue(errOut.contains("Cannot update these configs
dynamically: Set(security.inter.broker.protocol)"), errOut));
+ errOut -> assertTrue(errOut.contains("Cannot update these configs
dynamically: [security.inter.broker.protocol]"), errOut));
}
@ClusterTest