This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6f38fe5e0a6 KAFKA-14588 ZK configuration moved to ZkConfig (#15075) 6f38fe5e0a6 is described below commit 6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45 Author: Nikolay <nizhi...@apache.org> AuthorDate: Wed Mar 27 17:37:01 2024 +0300 KAFKA-14588 ZK configuration moved to ZkConfig (#15075) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- build.gradle | 5 +- checkstyle/import-control.xml | 1 + .../util/clusters/EmbeddedKafkaCluster.java | 3 +- core/src/main/scala/kafka/admin/AclCommand.scala | 8 +- .../src/main/scala/kafka/admin/ConfigCommand.scala | 4 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 7 +- .../kafka/security/authorizer/AclAuthorizer.scala | 7 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 179 ++++++--------------- core/src/main/scala/kafka/server/KafkaServer.scala | 30 ++-- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 8 +- .../api/DescribeAuthorizedOperationsTest.scala | 3 +- .../kafka/api/EndToEndAuthorizationTest.scala | 7 +- .../scala/integration/kafka/api/MetricsTest.scala | 3 +- .../kafka/api/PlaintextAdminIntegrationTest.scala | 4 +- .../kafka/api/SaslMultiMechanismConsumerTest.scala | 4 +- .../kafka/api/SaslPlainPlaintextConsumerTest.scala | 4 +- .../kafka/api/SaslSslAdminIntegrationTest.scala | 3 +- .../kafka/api/SaslSslConsumerTest.scala | 4 +- .../kafka/api/SslAdminIntegrationTest.scala | 5 +- .../server/DynamicBrokerReconfigurationTest.scala | 4 +- .../server/KafkaServerKRaftRegistrationTest.scala | 5 +- ...ListenersWithSameSecurityProtocolBaseTest.scala | 3 +- .../kafka/zk/ZkMigrationIntegrationTest.scala | 16 +- .../test/scala/unit/kafka/KafkaConfigTest.scala | 29 ++-- .../controller/ControllerChannelManagerTest.scala | 3 +- .../kafka/security/authorizer/AuthorizerTest.scala | 127 +++++++-------- .../kafka/server/DynamicBrokerConfigTest.scala | 6 +- .../scala/unit/kafka/server/KafkaConfigTest.scala | 80 ++++----- .../scala/unit/kafka/server/KafkaServerTest.scala | 51 +++--- .../unit/kafka/server/ServerShutdownTest.scala | 5 +- .../test/scala/unit/kafka/server/ServerTest.scala | 3 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 6 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 4 +- .../zk/migration/ZkMigrationTestHarness.scala | 3 +- .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 3 +- .../jmh/metadata/MetadataRequestBenchmark.java | 3 +- .../org/apache/kafka/server/config/Defaults.java | 10 -- .../org/apache/kafka/server/config/ZkConfigs.java | 145 +++++++++++++++++ .../integration/utils/EmbeddedKafkaCluster.java | 3 +- .../streams/integration/utils/KafkaEmbedded.java | 3 +- 40 files changed, 439 insertions(+), 362 deletions(-) diff --git a/build.gradle b/build.gradle index 489473c885a..0564d27a013 100644 --- a/build.gradle +++ b/build.gradle @@ -2160,6 +2160,7 @@ project(':streams') { testImplementation project(':storage') testImplementation project(':server-common') testImplementation project(':server-common').sourceSets.test.output + testImplementation project(':server') testImplementation libs.log4j testImplementation libs.junitJupiter testImplementation libs.junitVintageEngine @@ -2742,6 +2743,7 @@ project(':jmh-benchmarks') { exclude group: 'net.sf.jopt-simple', module: 'jopt-simple' } implementation project(':server-common') + implementation project(':server') implementation project(':clients') implementation project(':group-coordinator') implementation project(':metadata') @@ -2899,7 +2901,7 @@ project(':connect:json') { api libs.jacksonAfterburner implementation libs.slf4jApi - + testImplementation libs.junitJupiter testRuntimeOnly libs.slf4jlog4j @@ -2969,6 +2971,7 @@ project(':connect:runtime') { testImplementation project(':metadata') testImplementation project(':core').sourceSets.test.output testImplementation project(':server-common') + testImplementation project(':server') testImplementation project(':storage') testImplementation project(':connect:test-plugins') diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 8bbf5728212..616e0e519c6 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -605,6 +605,7 @@ <allow pkg="org.apache.kafka.metadata" /> <allow pkg="org.eclipse.jetty.client"/> <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" /> + <allow class="org.apache.kafka.server.config.ZkConfigs" /> </subpackage> </subpackage> diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index c15aa27ae59..8a06c8e0b0a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.config.ZkConfigs; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,7 +155,7 @@ public class EmbeddedKafkaCluster { } private void doStart() { - brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString()); + brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString()); putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true); putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0); diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 2e867d76dd2..7120d0d8c34 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -21,7 +21,6 @@ import java.util.Properties import joptsimple._ import joptsimple.util.EnumConverter import kafka.security.authorizer.{AclAuthorizer, AclEntry} -import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} import org.apache.kafka.common.acl._ @@ -33,6 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils} import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import scala.jdk.CollectionConverters._ @@ -200,7 +200,7 @@ object AclCommand extends Logging { // We will default the value of zookeeper.set.acl to true or false based on whether SASL is configured, // but if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication // then it will be up to the user to explicitly specify zookeeper.set.acl=true in the authorizer-properties. - val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSaslEnabled) + val defaultProps = Map(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP -> JaasUtils.isZkSaslEnabled) val authorizerPropertiesWithoutTls = if (opts.options.has(opts.authorizerPropertiesOpt)) { val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt) @@ -211,7 +211,7 @@ object AclCommand extends Logging { val authorizerProperties = if (opts.options.has(opts.zkTlsConfigFile)) { // load in TLS configs both with and without the "authorizer." prefix - val validKeys = (KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList ++ KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.map("authorizer." + _).toList).asJava + val validKeys = (ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList ++ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.map("authorizer." + _).toList).asJava authorizerPropertiesWithoutTls ++ Utils.loadProps(opts.options.valueOf(opts.zkTlsConfigFile), validKeys).asInstanceOf[java.util.Map[String, Any]].asScala } else @@ -619,7 +619,7 @@ object AclCommand extends Logging { "DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" + " the default authorizer kafka.security.authorizer.AclAuthorizer." + " Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " + - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") + ". Note that if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication being used" + " then it is necessary to explicitly specify --authorizer-properties zookeeper.set.acl=true. " + AclCommand.AuthorizerDeprecationMessage) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index d03d7dfb9b5..a10722f000f 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} -import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals} +import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals, ZkConfigs} import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs} import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.kafka.storage.internals.log.LogConfig @@ -864,7 +864,7 @@ object ConfigCommand extends Logging { .ofType(classOf[String]) val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file", "Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " + - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + " are ignored.") + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") + " are ignored.") .withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String]) options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 9c81bb23e2e..dd07f522e77 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -24,6 +24,7 @@ import kafka.utils.Implicits._ import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} import org.apache.zookeeper.KeeperException @@ -81,7 +82,7 @@ object ZkSecurityMigrator extends Logging { if (jaasFile == null && !tlsClientAuthEnabled) { val errorMsg = s"No JAAS configuration file has been specified and no TLS client certificate has been specified. Please make sure that you set " + s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide a ZooKeeper client TLS configuration via --$tlsConfigFileOption <filename> " + - s"identifying at least ${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and ${KafkaConfig.ZkSslKeyStoreLocationProp}" + s"identifying at least ${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and ${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP}" System.err.println("ERROR: %s".format(errorMsg)) throw new IllegalArgumentException("Incorrect configuration") } @@ -124,7 +125,7 @@ object ZkSecurityMigrator extends Logging { } def createZkClientConfigFromFile(filename: String) : ZKClientConfig = { - val zkTlsConfigFileProps = Utils.loadProps(filename, KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.asJava) + val zkTlsConfigFileProps = Utils.loadProps(filename, ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.asJava) val zkClientConfig = new ZKClientConfig() // Initializes based on any system properties that have been set // Now override any set system properties with explicitly-provided values from the config file // Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make @@ -156,7 +157,7 @@ object ZkSecurityMigrator extends Logging { "before migration. If not, exit the command.") val zkTlsConfigFile: OptionSpec[String] = parser.accepts(tlsConfigFileOption, "Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " + - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.mkString(", ") + " are ignored.") + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.mkString(", ") + " are ignored.") .withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String]) options = parser.parse(args : _*) } diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 99642b33c2b..dc307ca702f 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time} import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1 +import org.apache.kafka.server.config.ZkConfigs import org.apache.zookeeper.client.ZKClientConfig import scala.annotation.nowarn @@ -96,7 +97,7 @@ object AclAuthorizer { } private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = { - val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp). + val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP). map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean if (!zkSslClientEnable) new ZKClientConfig @@ -105,10 +106,10 @@ object AclAuthorizer { // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true) // add in any prefixed overlays - KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { (kafkaProp, sysProp) => configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue => zkClientConfig.setProperty(sysProp, - if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp) + if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP) (prefixedValue.toString.trim.toUpperCase == "HTTPS").toString else prefixedValue.toString.trim) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4290ce1da8d..cec5f5649f6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -45,7 +45,7 @@ import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator} import org.apache.kafka.server.common.MetadataVersion._ -import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms} +import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Csv @@ -67,53 +67,15 @@ object KafkaConfig { DynamicBrokerConfig.dynamicConfigUpdateModes)) } - /** ********* Zookeeper Configuration ***********/ - val ZkConnectProp = "zookeeper.connect" - val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms" - val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms" - val ZkEnableSecureAclsProp = "zookeeper.set.acl" - val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests" - val ZkSslClientEnableProp = "zookeeper.ssl.client.enable" - val ZkClientCnxnSocketProp = "zookeeper.clientCnxnSocket" - val ZkSslKeyStoreLocationProp = "zookeeper.ssl.keystore.location" - val ZkSslKeyStorePasswordProp = "zookeeper.ssl.keystore.password" - val ZkSslKeyStoreTypeProp = "zookeeper.ssl.keystore.type" - val ZkSslTrustStoreLocationProp = "zookeeper.ssl.truststore.location" - val ZkSslTrustStorePasswordProp = "zookeeper.ssl.truststore.password" - val ZkSslTrustStoreTypeProp = "zookeeper.ssl.truststore.type" - val ZkSslProtocolProp = "zookeeper.ssl.protocol" - val ZkSslEnabledProtocolsProp = "zookeeper.ssl.enabled.protocols" - val ZkSslCipherSuitesProp = "zookeeper.ssl.cipher.suites" - val ZkSslEndpointIdentificationAlgorithmProp = "zookeeper.ssl.endpoint.identification.algorithm" - val ZkSslCrlEnableProp = "zookeeper.ssl.crl.enable" - val ZkSslOcspEnableProp = "zookeeper.ssl.ocsp.enable" - - // a map from the Kafka config to the corresponding ZooKeeper Java system property - private[kafka] val ZkSslConfigToSystemPropertyMap: Map[String, String] = Map( - ZkSslClientEnableProp -> ZKClientConfig.SECURE_CLIENT, - ZkClientCnxnSocketProp -> ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, - ZkSslKeyStoreLocationProp -> "zookeeper.ssl.keyStore.location", - ZkSslKeyStorePasswordProp -> "zookeeper.ssl.keyStore.password", - ZkSslKeyStoreTypeProp -> "zookeeper.ssl.keyStore.type", - ZkSslTrustStoreLocationProp -> "zookeeper.ssl.trustStore.location", - ZkSslTrustStorePasswordProp -> "zookeeper.ssl.trustStore.password", - ZkSslTrustStoreTypeProp -> "zookeeper.ssl.trustStore.type", - ZkSslProtocolProp -> "zookeeper.ssl.protocol", - ZkSslEnabledProtocolsProp -> "zookeeper.ssl.enabledProtocols", - ZkSslCipherSuitesProp -> "zookeeper.ssl.ciphersuites", - ZkSslEndpointIdentificationAlgorithmProp -> "zookeeper.ssl.hostnameVerification", - ZkSslCrlEnableProp -> "zookeeper.ssl.crl", - ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp") - private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = { - Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName))) + Option(clientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName))) } private[kafka] def setZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String, kafkaPropValue: Any): Unit = { - clientConfig.setProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName), + clientConfig.setProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName), kafkaPropName match { - case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString - case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match { + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString + case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => kafkaPropValue match { case list: java.util.List[_] => list.asScala.mkString(",") case _ => kafkaPropValue.toString } @@ -124,9 +86,9 @@ object KafkaConfig { // For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS // with both a client connection socket and a key store location explicitly set. private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = { - zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") && - zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined && - zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined + zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).contains("true") && + zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP).isDefined && + zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP).isDefined } /** ********* General Configuration ***********/ @@ -439,51 +401,6 @@ object KafkaConfig { val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable" /* Documentation */ - /** ********* Zookeeper Configuration ***********/ - val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " + - "host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " + - "down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" + - "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + - "For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>." - val ZkSessionTimeoutMsDoc = "Zookeeper session timeout" - val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used" - val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" - val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking." - val ZkSslClientEnableDoc = "Set client to use TLS when connecting to ZooKeeper." + - " An explicit value overrides any value set via the <code>zookeeper.client.secure</code> system property (note the different name)." + - s" Defaults to false if neither is set; when true, <code>$ZkClientCnxnSocketProp</code> must be set (typically to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set may include " + - ZkSslConfigToSystemPropertyMap.keys.toList.filter(x => x != ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).sorted.mkString("<code>", "</code>, <code>", "</code>") - val ZkClientCnxnSocketDoc = "Typically set to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the same-named <code>${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)}</code> system property." - val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreLocationProp)}</code> system property (note the camelCase)." - val ZkSslKeyStorePasswordDoc = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStorePasswordProp)}</code> system property (note the camelCase)." + - " Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail." - val ZkSslKeyStoreTypeDoc = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreTypeProp)}</code> system property (note the camelCase)." + - " The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the keystore." - val ZkSslTrustStoreLocationDoc = "Truststore location when using TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreLocationProp)}</code> system property (note the camelCase)." - val ZkSslTrustStorePasswordDoc = "Truststore password when using TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStorePasswordProp)}</code> system property (note the camelCase)." - val ZkSslTrustStoreTypeDoc = "Truststore type when using TLS connectivity to ZooKeeper." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreTypeProp)}</code> system property (note the camelCase)." + - " The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the truststore." - val ZkSslProtocolDoc = "Specifies the protocol to be used in ZooKeeper TLS negotiation." + - s" An explicit value overrides any value set via the same-named <code>${ZkSslConfigToSystemPropertyMap(ZkSslProtocolProp)}</code> system property." - val ZkSslEnabledProtocolsDoc = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslEnabledProtocolsProp)}</code> system property (note the camelCase)." + - s" The default value of <code>null</code> means the enabled protocol will be the value of the <code>${KafkaConfig.ZkSslProtocolProp}</code> configuration property." - val ZkSslCipherSuitesDoc = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." + - s""" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslCipherSuitesProp)}</code> system property (note the single word \"ciphersuites\").""" + - " The default value of <code>null</code> means the list of enabled cipher suites is determined by the Java runtime being used." - val ZkSslEndpointIdentificationAlgorithmDoc = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." + - s""" An explicit value overrides any \"true\" or \"false\" value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslEndpointIdentificationAlgorithmProp)}</code> system property (note the different name and values; true implies https and false implies blank).""" - val ZkSslCrlEnableDoc = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslCrlEnableProp)}</code> system property (note the shorter name)." - val ZkSslOcspEnableDoc = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." + - s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslOcspEnableProp)}</code> system property (note the shorter name)." /** ********* General Configuration ***********/ val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed." val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" @@ -943,25 +860,25 @@ object KafkaConfig { new ConfigDef() /** ********* Zookeeper Configuration ***********/ - .define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc) - .define(ZkSessionTimeoutMsProp, INT, Defaults.ZK_SESSION_TIMEOUT_MS, HIGH, ZkSessionTimeoutMsDoc) - .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc) - .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZK_ENABLE_SECURE_ACLS, HIGH, ZkEnableSecureAclsDoc) - .define(ZkMaxInFlightRequestsProp, INT, Defaults.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc) - .define(ZkSslClientEnableProp, BOOLEAN, Defaults.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkSslClientEnableDoc) - .define(ZkClientCnxnSocketProp, STRING, null, MEDIUM, ZkClientCnxnSocketDoc) - .define(ZkSslKeyStoreLocationProp, STRING, null, MEDIUM, ZkSslKeyStoreLocationDoc) - .define(ZkSslKeyStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslKeyStorePasswordDoc) - .define(ZkSslKeyStoreTypeProp, STRING, null, MEDIUM, ZkSslKeyStoreTypeDoc) - .define(ZkSslTrustStoreLocationProp, STRING, null, MEDIUM, ZkSslTrustStoreLocationDoc) - .define(ZkSslTrustStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslTrustStorePasswordDoc) - .define(ZkSslTrustStoreTypeProp, STRING, null, MEDIUM, ZkSslTrustStoreTypeDoc) - .define(ZkSslProtocolProp, STRING, Defaults.ZK_SSL_PROTOCOL, LOW, ZkSslProtocolDoc) - .define(ZkSslEnabledProtocolsProp, LIST, null, LOW, ZkSslEnabledProtocolsDoc) - .define(ZkSslCipherSuitesProp, LIST, null, LOW, ZkSslCipherSuitesDoc) - .define(ZkSslEndpointIdentificationAlgorithmProp, STRING, Defaults.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkSslEndpointIdentificationAlgorithmDoc) - .define(ZkSslCrlEnableProp, BOOLEAN, Defaults.ZK_SSL_CRL_ENABLE, LOW, ZkSslCrlEnableDoc) - .define(ZkSslOcspEnableProp, BOOLEAN, Defaults.ZK_SSL_OCSP_ENABLE, LOW, ZkSslOcspEnableDoc) + .define(ZkConfigs.ZK_CONNECT_PROP, STRING, null, HIGH, ZkConfigs.ZK_CONNECT_DOC) + .define(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, INT, ZkConfigs.ZK_SESSION_TIMEOUT_MS, HIGH, ZkConfigs.ZK_SESSION_TIMEOUT_MS_DOC) + .define(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, INT, null, HIGH, ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_DOC) + .define(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, BOOLEAN, ZkConfigs.ZK_ENABLE_SECURE_ACLS, HIGH, ZkConfigs.ZK_ENABLE_SECURE_ACLS_DOC) + .define(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP, INT, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_DOC) + .define(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkConfigs.ZK_SSL_CLIENT_ENABLE_DOC) + .define(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_DOC) + .define(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_DOC) + .define(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_DOC) + .define(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_DOC) + .define(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_DOC) + .define(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_DOC) + .define(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_DOC) + .define(ZkConfigs.ZK_SSL_PROTOCOL_PROP, STRING, ZkConfigs.ZK_SSL_PROTOCOL, LOW, ZkConfigs.ZK_SSL_PROTOCOL_DOC) + .define(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, LIST, null, LOW, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_DOC) + .define(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, LIST, null, LOW, ZkConfigs.ZK_SSL_CIPHER_SUITES_DOC) + .define(ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, STRING, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC) + .define(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_CRL_ENABLE, LOW, ZkConfigs.ZK_SSL_CRL_ENABLE_DOC) + .define(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_OCSP_ENABLE, LOW, ZkConfigs.ZK_SSL_OCSP_ENABLE_DOC) /** ********* General Configuration ***********/ .define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BROKER_ID_GENERATION_ENABLE, MEDIUM, BrokerIdGenerationEnableDoc) @@ -1417,12 +1334,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami super.valuesWithPrefixOverride(prefix) /** ********* Zookeeper Configuration ***********/ - val zkConnect: String = getString(KafkaConfig.ZkConnectProp) - val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp) + val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_PROP) + val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP) val zkConnectionTimeoutMs: Int = - Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp)) - val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp) - val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp) + Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP)) + val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP) + val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP) private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig @@ -1470,21 +1387,21 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } - val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslClientEnableProp) - val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkClientCnxnSocketProp) - val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreLocationProp) - val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslKeyStorePasswordProp) - val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreTypeProp) - val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreLocationProp) - val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslTrustStorePasswordProp) - val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreTypeProp) - val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslProtocolProp) - val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(KafkaConfig.ZkSslEnabledProtocolsProp) - val ZkSslCipherSuites = zkListConfigOrSystemProperty(KafkaConfig.ZkSslCipherSuitesProp) + val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP) + val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP) + val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP) + val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP) + val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP) + val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP) + val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP) + val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP) + val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_PROTOCOL_PROP) + val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP) + val ZkSslCipherSuites = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP) val ZkSslEndpointIdentificationAlgorithm = { // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided // Need to translate any system property value from true/false to HTTPS/<blank> - val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp + val kafkaProp = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP val actuallyProvided = originals.containsKey(kafkaProp) if (actuallyProvided) getString(kafkaProp) @@ -1496,8 +1413,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } } - val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslCrlEnableProp) - val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslOcspEnableProp) + val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP) + val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP) /** ********* General Configuration ***********/ val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) @@ -2040,7 +1957,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } if (requiresZookeeper) { if (zkConnect == null) { - throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.") + throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_PROP}` which has no default value.") } if (brokerIdGenerationEnable) { require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id") @@ -2055,7 +1972,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } if (migrationEnabled) { if (zkConnect == null) { - throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${KafkaConfig.ZkConnectProp}` must also be set.") + throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${ZkConfigs.ZK_CONNECT_PROP}` must also be set.") } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 55c773b0a46..fce61e17a22 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -57,7 +57,7 @@ import org.apache.kafka.server.NodeToControllerChannelManager import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigType, ZkConfigs} import org.apache.kafka.server.fault.LoggingFaultHandler import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -80,20 +80,20 @@ object KafkaServer { def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = { val clientConfig = new ZKClientConfig if (config.zkSslClientEnable || forceZkSslClientEnable) { - KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true") - config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _)) - config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _)) - config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStorePasswordProp, x.value)) - config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreTypeProp, _)) - config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreLocationProp, _)) - config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStorePasswordProp, x.value)) - config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreTypeProp, _)) - KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslProtocolProp, config.ZkSslProtocol) - config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEnabledProtocolsProp, _)) - config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCipherSuitesProp, _)) - KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm) - KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString) - KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString) + KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "true") + config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, _)) + config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, _)) + config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, x.value)) + config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, _)) + config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, _)) + config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, x.value)) + config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, _)) + KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_PROTOCOL_PROP, config.ZkSslProtocol) + config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, _)) + config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, _)) + KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, config.ZkSslEndpointIdentificationAlgorithm) + KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, config.ZkSslCrlEnable.toString) + KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, config.ZkSslOcspEnable.toString) } // The zk sasl is enabled by default so it can produce false error when broker does not intend to use SASL. if (!JaasUtils.isZkSaslEnabled) clientConfig.setProperty(JaasUtils.ZK_SASL_CLIENT, "false") diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 26c9453bef2..749de428533 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigType, ZkConfigs} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.storage.internals.log.LogConfig import org.apache.zookeeper.KeeperException.{Code, NodeExistsException} @@ -2349,9 +2349,9 @@ object KafkaZkClient { if (secureAclsEnabled && !isZkSecurityEnabled) throw new java.lang.SecurityException( - s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least " + - s"${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and " + - s"${KafkaConfig.ZkSslKeyStoreLocationProp} was not present and the verification of the JAAS login file failed " + + s"${ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP} is true, but ZooKeeper client TLS configuration identifying at least " + + s"${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and " + + s"${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP} was not present and the verification of the JAAS login file failed " + s"${JaasUtils.zkSecuritySysConfigString}") KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 6ea0c2eaec0..77d0784b1a5 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -26,6 +26,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} @@ -75,7 +76,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS import DescribeAuthorizedOperationsTest._ override val brokerCount = 1 - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName) var client: Admin = _ diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 46e674c00aa..061ba3de9ea 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -18,9 +18,9 @@ package kafka.api import com.yammer.metrics.core.Gauge + import java.util.{Collections, Properties} import java.util.concurrent.ExecutionException - import kafka.security.authorizer.AclAuthorizer import kafka.security.authorizer.AclEntry.WildcardHost import org.apache.kafka.metadata.authorizer.StandardAuthorizer @@ -38,11 +38,12 @@ import org.apache.kafka.common.resource._ import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} import org.apache.kafka.common.security.auth._ +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{ValueSource, CsvSource} +import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import scala.jdk.CollectionConverters._ @@ -156,7 +157,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } else { // The next two configuration parameters enable ZooKeeper secure ACLs // and sets the Kafka authorizer, both necessary to enable security. - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) // Set the specific principal that can update ACLs. diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index cb4b5b96979..658b57cddd3 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.authenticator.TestJaasConfig +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -43,7 +44,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) private val kafkaServerJaasEntryName = s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false") this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false") this.serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "2.8") this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10") diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index e084454f5ff..fe32106877a 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT -import org.apache.kafka.server.config.Defaults +import org.apache.kafka.server.config.{Defaults, ZkConfigs} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo} @@ -2707,7 +2707,7 @@ object PlaintextAdminIntegrationTest { var topicConfigEntries2 = Seq(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, test.brokers.head.config.brokerId.toString) - val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, "localhost:2181")).asJava + val brokerConfigEntries = Seq(new ConfigEntry(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")).asJava // Alter configs: first and third are invalid, second is valid var alterResult = admin.alterConfigs(Map( diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index 6fc3fb9b8a1..4cf7a320ff2 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -12,10 +12,10 @@ */ package kafka.api -import kafka.server.KafkaConfig import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { private val kafkaClientSaslMechanism = "PLAIN" private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN") - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index 6a03d51aa36..f039e16faa3 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -13,10 +13,10 @@ package kafka.api import java.util.Locale -import kafka.server.KafkaConfig import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} @Timeout(600) @@ -26,7 +26,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) private val kafkaServerJaasEntryName = s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false") // disable secure acls of zkClient in QuorumTestHarness override protected def zkAclsEnabled = Some(false) override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 29b51713257..1d2f4eb8bbf 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} @@ -46,7 +47,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer]) - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala index 0ea458e25aa..19b6e9e4614 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala @@ -12,14 +12,14 @@ */ package kafka.api -import kafka.server.KafkaConfig import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} @Timeout(600) class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup { - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 4e6ae3f8132..ff6ea03a677 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -14,10 +14,8 @@ package kafka.api import java.util import java.util.concurrent._ - import com.yammer.metrics.core.Gauge import kafka.security.authorizer.AclAuthorizer -import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult} import org.apache.kafka.common.acl._ @@ -25,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.server.authorizer._ +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} import org.junit.jupiter.api.{AfterEach, Test} @@ -80,7 +79,7 @@ object SslAdminIntegrationTest { class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer]) - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") override protected def securityProtocol = SecurityProtocol.SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c984eae0272..dd85bd41cb0 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.kafka.security.PasswordEncoder -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigType, ZkConfigs} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.ShutdownableThread @@ -122,7 +122,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup properties } else { val properties = TestUtils.createBrokerConfig(brokerId, zkConnect) - properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true") + properties.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") properties } props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS) diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala index d4a1e8f1dd1..7bf86259230 100644 --- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala @@ -25,6 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import org.apache.kafka.common.Uuid import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.Assertions.{assertThrows, fail} import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{Tag, Timeout} @@ -61,7 +62,7 @@ class KafkaServerKRaftRegistrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -99,7 +100,7 @@ class KafkaServerKRaftRegistrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index 4d7ddc15cb3..16e1a290278 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.coordinator.group.OffsetConfig +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} @@ -79,7 +80,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," + s"$External:PLAINTEXT, $SecureExternal:SASL_SSL") props.put(KafkaConfig.InterBrokerListenerNameProp, Internal) - props.put(KafkaConfig.ZkEnableSecureAclsProp, "true") + props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true") props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism) props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}", kafkaServerSaslMechanisms(SecureInternal).mkString(",")) diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index a3e5ecc0814..ff8d2b797f3 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.server.ControllerRequestCompletionHandler import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock} -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigType, ZkConfigs} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail} import org.junit.jupiter.api.{Assumptions, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -177,7 +177,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -309,7 +309,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -443,7 +443,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -508,7 +508,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -576,7 +576,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -636,7 +636,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() @@ -711,7 +711,7 @@ class ZkMigrationIntegrationTest { setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") - .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) .build() try { kraftCluster.format() diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 4cec0d05e5b..3d97e7df171 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -25,6 +25,7 @@ import kafka.utils.TestUtils.assertBadConfigContainingMessage import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.internals.FatalExitError +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions._ @@ -155,7 +156,7 @@ class KafkaTest { "Missing required configuration `zookeeper.connect` which has no default value.") // Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names) - propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") propertiesFile.setProperty(KafkaConfig.ControllerListenerNamesProp, "") KafkaConfig.fromProps(propertiesFile) } @@ -232,61 +233,61 @@ class KafkaTest { @Test def testZkSslClientEnable(): Unit = { - testZkConfig(KafkaConfig.ZkSslClientEnableProp, "zookeeper.ssl.client.enable", + testZkConfig(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "zookeeper.ssl.client.enable", "zookeeper.client.secure", booleanPropValueToSet, config => Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false)) } @Test def testZkSslKeyStoreLocation(): Unit = { - testZkConfig(KafkaConfig.ZkSslKeyStoreLocationProp, "zookeeper.ssl.keystore.location", + testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, "zookeeper.ssl.keystore.location", "zookeeper.ssl.keyStore.location", stringPropValueToSet, config => config.zkSslKeyStoreLocation, stringPropValueToSet) } @Test def testZkSslTrustStoreLocation(): Unit = { - testZkConfig(KafkaConfig.ZkSslTrustStoreLocationProp, "zookeeper.ssl.truststore.location", + testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, "zookeeper.ssl.truststore.location", "zookeeper.ssl.trustStore.location", stringPropValueToSet, config => config.zkSslTrustStoreLocation, stringPropValueToSet) } @Test def testZookeeperKeyStorePassword(): Unit = { - testZkConfig(KafkaConfig.ZkSslKeyStorePasswordProp, "zookeeper.ssl.keystore.password", + testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, "zookeeper.ssl.keystore.password", "zookeeper.ssl.keyStore.password", passwordPropValueToSet, config => config.zkSslKeyStorePassword, new Password(passwordPropValueToSet)) } @Test def testZookeeperTrustStorePassword(): Unit = { - testZkConfig(KafkaConfig.ZkSslTrustStorePasswordProp, "zookeeper.ssl.truststore.password", + testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, "zookeeper.ssl.truststore.password", "zookeeper.ssl.trustStore.password", passwordPropValueToSet, config => config.zkSslTrustStorePassword, new Password(passwordPropValueToSet)) } @Test def testZkSslKeyStoreType(): Unit = { - testZkConfig(KafkaConfig.ZkSslKeyStoreTypeProp, "zookeeper.ssl.keystore.type", + testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, "zookeeper.ssl.keystore.type", "zookeeper.ssl.keyStore.type", stringPropValueToSet, config => config.zkSslKeyStoreType, stringPropValueToSet) } @Test def testZkSslTrustStoreType(): Unit = { - testZkConfig(KafkaConfig.ZkSslTrustStoreTypeProp, "zookeeper.ssl.truststore.type", + testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, "zookeeper.ssl.truststore.type", "zookeeper.ssl.trustStore.type", stringPropValueToSet, config => config.zkSslTrustStoreType, stringPropValueToSet) } @Test def testZkSslProtocol(): Unit = { - testZkConfig(KafkaConfig.ZkSslProtocolProp, "zookeeper.ssl.protocol", + testZkConfig(ZkConfigs.ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol", "zookeeper.ssl.protocol", stringPropValueToSet, config => Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2")) } @Test def testZkSslEnabledProtocols(): Unit = { - testZkConfig(KafkaConfig.ZkSslEnabledProtocolsProp, "zookeeper.ssl.enabled.protocols", + testZkConfig(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, "zookeeper.ssl.enabled.protocols", "zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","), config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava) } @Test def testZkSslCipherSuites(): Unit = { - testZkConfig(KafkaConfig.ZkSslCipherSuitesProp, "zookeeper.ssl.cipher.suites", + testZkConfig(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, "zookeeper.ssl.cipher.suites", "zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config => config.ZkSslCipherSuites, listPropValueToSet.asJava) } @@ -294,7 +295,7 @@ class KafkaTest { def testZkSslEndpointIdentificationAlgorithm(): Unit = { // this property is different than the others // because the system property values and the Kafka property values don't match - val kafkaPropName = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp + val kafkaPropName = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP assertEquals("zookeeper.ssl.endpoint.identification.algorithm", kafkaPropName) val sysProp = "zookeeper.ssl.hostnameVerification" val expectedDefaultValue = "HTTPS" @@ -327,13 +328,13 @@ class KafkaTest { @Test def testZkSslCrlEnable(): Unit = { - testZkConfig(KafkaConfig.ZkSslCrlEnableProp, "zookeeper.ssl.crl.enable", + testZkConfig(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl.enable", "zookeeper.ssl.crl", booleanPropValueToSet, config => Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false)) } @Test def testZkSslOcspEnable(): Unit = { - testZkConfig(KafkaConfig.ZkSslOcspEnableProp, "zookeeper.ssl.ocsp.enable", + testZkConfig(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, "zookeeper.ssl.ocsp.enable", "zookeeper.ssl.ocsp", booleanPropValueToSet, config => Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false)) } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index d6718e4203c..2730431565e 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_0_10_2_IV0, IBP_0_9_0, IBP_1_0_IV0, IBP_2_2_IV0, IBP_2_4_IV0, IBP_2_4_IV1, IBP_2_6_IV0, IBP_2_8_IV1, IBP_3_2_IV0, IBP_3_4_IV0} +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -895,7 +896,7 @@ class ControllerChannelManagerTest { private def createConfig(interBrokerVersion: MetadataVersion): KafkaConfig = { val props = new Properties() props.put(KafkaConfig.BrokerIdProp, controllerId.toString) - props.put(KafkaConfig.ZkConnectProp, "zkConnect") + props.put(ZkConfigs.ZK_CONNECT_PROP, "zkConnect") TestUtils.setIbpAndMessageFormatVersions(props, interBrokerVersion) KafkaConfig.fromProps(props) } diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala index 80790ab905f..29907f22068 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala @@ -41,6 +41,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1} +import org.apache.kafka.server.config.ZkConfigs import org.apache.zookeeper.client.ZKClientConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} @@ -869,7 +870,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(noTlsProps), noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala) - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { propName => assertNull(zkClientConfig.getProperty(propName)) } } @@ -879,27 +880,27 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val props = new java.util.Properties() val kafkaValue = "kafkaValue" val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it - KafkaConfig.ZkSslClientEnableProp -> "true", - KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue, - KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue, - KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue, - KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue, - KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue, - KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue, - KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue, - KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue, - KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue) + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true", + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue) configs.foreach { case (key, value) => props.put(key, value) } val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*)) // confirm we get all the values we expect - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match { - case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match { + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) - case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => + case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) - case KafkaConfig.ZkSslProtocolProp => + case ZkConfigs.ZK_SSL_PROTOCOL_PROP => assertEquals("TLSv1.2", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) }) @@ -910,29 +911,29 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val props = new java.util.Properties() val kafkaValue = "kafkaValue" val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it - KafkaConfig.ZkSslClientEnableProp -> "true", - KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue, - KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue, - KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue, - KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue, - KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue, - KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue, - KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue, - KafkaConfig.ZkSslProtocolProp -> kafkaValue, - KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue, - KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue, - KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS", - KafkaConfig.ZkSslCrlEnableProp -> "false", - KafkaConfig.ZkSslOcspEnableProp -> "false") + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true", + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS", + ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false", + ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false") configs.foreach{case (key, value) => props.put(key, value.toString) } val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*)) // confirm we get all the values we expect - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match { - case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match { + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) - case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => + case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) }) @@ -945,43 +946,43 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { val prefixedValue = "prefixedValue" val prefix = "authorizer." val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it - KafkaConfig.ZkSslClientEnableProp -> "false", - KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue, - KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue, - KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue, - KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue, - KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue, - KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue, - KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue, - KafkaConfig.ZkSslProtocolProp -> kafkaValue, - KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue, - KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue, - KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS", - KafkaConfig.ZkSslCrlEnableProp -> "false", - KafkaConfig.ZkSslOcspEnableProp -> "false", - prefix + KafkaConfig.ZkSslClientEnableProp -> "true", - prefix + KafkaConfig.ZkClientCnxnSocketProp -> prefixedValue, - prefix + KafkaConfig.ZkSslKeyStoreLocationProp -> prefixedValue, - prefix + KafkaConfig.ZkSslKeyStorePasswordProp -> prefixedValue, - prefix + KafkaConfig.ZkSslKeyStoreTypeProp -> prefixedValue, - prefix + KafkaConfig.ZkSslTrustStoreLocationProp -> prefixedValue, - prefix + KafkaConfig.ZkSslTrustStorePasswordProp -> prefixedValue, - prefix + KafkaConfig.ZkSslTrustStoreTypeProp -> prefixedValue, - prefix + KafkaConfig.ZkSslProtocolProp -> prefixedValue, - prefix + KafkaConfig.ZkSslEnabledProtocolsProp -> prefixedValue, - prefix + KafkaConfig.ZkSslCipherSuitesProp -> prefixedValue, - prefix + KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "", - prefix + KafkaConfig.ZkSslCrlEnableProp -> "true", - prefix + KafkaConfig.ZkSslOcspEnableProp -> "true") + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "false", + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue, + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS", + ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false", + ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false", + prefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true", + prefix + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_PROTOCOL_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> prefixedValue, + prefix + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "", + prefix + ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "true", + prefix + ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "true") configs.foreach{case (key, value) => props.put(key, value.toString) } val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap( KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*)) // confirm we get all the values we expect - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match { - case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match { + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) - case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) case _ => assertEquals(prefixedValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>")) }) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 4e1c8eed276..8e7fe06ef2e 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.config.Defaults +import org.apache.kafka.server.config.{Defaults, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.KafkaScheduler @@ -211,7 +211,7 @@ class DynamicBrokerConfigTest { val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix) - val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181") + val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_PROP -> "somehost:2181") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps) // Test update of configs with invalid type @@ -709,7 +709,7 @@ class DynamicBrokerConfigTest { @Test def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = { val props = new Properties() - props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.put(KafkaConfig.MetadataLogSegmentMinBytesProp, "1024") val config = new KafkaConfig(props) assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index a5d4d961fe1..fd4377f658e 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.Node import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1} -import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.config.{ServerTopicConfigSynonyms, ZkConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.junit.jupiter.api.function.Executable @@ -155,7 +155,7 @@ class KafkaConfigTest { val hostName = "fake-host" val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port") val serverConfig = KafkaConfig.fromProps(props) @@ -186,7 +186,7 @@ class KafkaConfigTest { def testDuplicateListeners(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") // listeners with duplicate port props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091") @@ -212,7 +212,7 @@ class KafkaConfigTest { def testIPv4AndIPv6SamePortListeners(): Unit = { val props = new Properties() props.put(KafkaConfig.BrokerIdProp, "1") - props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092") var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) @@ -452,7 +452,7 @@ class KafkaConfigTest { def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092") assertBadConfigContainingMessage(props, "Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER") @@ -465,7 +465,7 @@ class KafkaConfigTest { def testBadListenerProtocol(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091") assertFalse(isValidKafkaConfig(props)) @@ -475,7 +475,7 @@ class KafkaConfigTest { def testListenerNamesWithAdvertisedListenerUnset(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093") props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT") @@ -499,7 +499,7 @@ class KafkaConfigTest { def testListenerAndAdvertisedListenerNames(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093") props.setProperty(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093") @@ -530,7 +530,7 @@ class KafkaConfigTest { def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092") props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL") @@ -541,7 +541,7 @@ class KafkaConfigTest { def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091") props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION") @@ -552,7 +552,7 @@ class KafkaConfigTest { def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091") props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL") @@ -564,7 +564,7 @@ class KafkaConfigTest { def testCaseInsensitiveListenerProtocol(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") props.setProperty(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092") val config = KafkaConfig.fromProps(props) assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString)) @@ -579,7 +579,7 @@ class KafkaConfigTest { def testListenerDefaults(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") // configuration with no listeners val conf = KafkaConfig.fromProps(props) @@ -593,7 +593,7 @@ class KafkaConfigTest { def testVersionConfiguration(): Unit = { val props = new Properties() props.setProperty(KafkaConfig.BrokerIdProp, "1") - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") val conf = KafkaConfig.fromProps(props) assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion) @@ -766,7 +766,7 @@ class KafkaConfigTest { def testFromPropsInvalid(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() - validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181") validRequiredProperties } // to ensure a basis is valid - bootstraps all needed validation @@ -774,25 +774,25 @@ class KafkaConfigTest { KafkaConfig.configNames.foreach { name => name match { - case KafkaConfig.ZkConnectProp => // ignore string - case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") - case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") - case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") - case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") - case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") - case KafkaConfig.ZkClientCnxnSocketProp => //ignore string - case KafkaConfig.ZkSslKeyStoreLocationProp => //ignore string - case KafkaConfig.ZkSslKeyStorePasswordProp => //ignore string - case KafkaConfig.ZkSslKeyStoreTypeProp => //ignore string - case KafkaConfig.ZkSslTrustStoreLocationProp => //ignore string - case KafkaConfig.ZkSslTrustStorePasswordProp => //ignore string - case KafkaConfig.ZkSslTrustStoreTypeProp => //ignore string - case KafkaConfig.ZkSslProtocolProp => //ignore string - case KafkaConfig.ZkSslEnabledProtocolsProp => //ignore string - case KafkaConfig.ZkSslCipherSuitesProp => //ignore string - case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore string - case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") - case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case ZkConfigs.ZK_CONNECT_PROP => // ignore string + case ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number") + case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number") + case ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP => //ignore string + case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP => //ignore string + case ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP => //ignore string + case ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP => //ignore string + case ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP => //ignore string + case ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP => //ignore string + case ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP => //ignore string + case ZkConfigs.ZK_SSL_PROTOCOL_PROP => //ignore string + case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP => //ignore string + case ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => //ignore string + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => //ignore string + case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean") + case ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean") case KafkaConfig.BrokerIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") @@ -1051,7 +1051,7 @@ class KafkaConfigTest { def testDynamicLogConfigs(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() - validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181") validRequiredProperties } @@ -1141,9 +1141,9 @@ class KafkaConfigTest { @Test def testSpecificProperties(): Unit = { val defaults = new Properties() - defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + defaults.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181") // For ZkConnectionTimeoutMs - defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234") + defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, "1234") defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false") defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1") defaults.setProperty(KafkaConfig.BrokerIdProp, "1") @@ -1187,7 +1187,7 @@ class KafkaConfigTest { @Test def testNonroutableAdvertisedListeners(): Unit = { val props = new Properties() - props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181") props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092") assertFalse(isValidKafkaConfig(props)) } @@ -1601,7 +1601,7 @@ class KafkaConfigTest { @Test def testSaslJwksEndpointRetryDefaults(): Unit = { val props = new Properties() - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") val config = KafkaConfig.fromProps(props) assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp)) assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp)) @@ -1769,7 +1769,7 @@ class KafkaConfigTest { "If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.", assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage) - props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181") + props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181") KafkaConfig.fromProps(props) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala index bf6312f8631..035f60dab1d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -25,8 +25,11 @@ import org.junit.jupiter.api.Test import java.util.Properties import java.net.{InetAddress, ServerSocket} import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig +import scala.jdk.CollectionConverters._ + class KafkaServerTest extends QuorumTestHarness { @Test @@ -64,7 +67,7 @@ class KafkaServerTest extends QuorumTestHarness { @Test def testCreatesProperZkConfigWhenSaslDisabled(): Unit = { val props = new Properties - props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out + props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) assertEquals("false", zkClientConfig.getProperty(JaasUtils.ZK_SASL_CLIENT)) } @@ -72,10 +75,10 @@ class KafkaServerTest extends QuorumTestHarness { @Test def testCreatesProperZkTlsConfigWhenDisabled(): Unit = { val props = new Properties - props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out - props.put(KafkaConfig.ZkSslClientEnableProp, "false") + props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out + props.put(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "false") val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { propName => assertNull(zkClientConfig.getProperty(propName)) } } @@ -83,51 +86,51 @@ class KafkaServerTest extends QuorumTestHarness { @Test def testCreatesProperZkTlsConfigWithTrueValues(): Unit = { val props = new Properties - props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out + props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out // should get correct config for all properties if TLS is enabled val someValue = "some_value" def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match { - case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true" - case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "HTTPS" + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true" + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "HTTPS" case _ => someValue } - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp))) + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp))) val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) // now check to make sure the values were set correctly def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match { - case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true" - case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "true" + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true" + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "true" case _ => someValue } - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => - assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp)))) + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => + assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp)))) } @Test def testCreatesProperZkTlsConfigWithFalseAndListValues(): Unit = { val props = new Properties - props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out + props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out // should get correct config for all properties if TLS is enabled val someValue = "some_value" def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match { - case KafkaConfig.ZkSslClientEnableProp => "true" - case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false" - case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "" - case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B" + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true" + case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false" + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "" + case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B" case _ => someValue } - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp))) + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp))) val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)) // now check to make sure the values were set correctly def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match { - case KafkaConfig.ZkSslClientEnableProp => "true" - case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false" - case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "false" - case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B" + case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true" + case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false" + case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "false" + case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B" case _ => someValue } - KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => - assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp)))) + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => + assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp)))) } @Test diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 844701cc371..0e04bad7c8e 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer} import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.BrokerState +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable @@ -150,8 +151,8 @@ class ServerShutdownTest extends KafkaServerTestHarness { shutdownKRaftController() verifyCleanShutdownAfterFailedStartup[CancellationException] } else { - propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50") - propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535") + propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "50") + propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECT_PROP, "some.invalid.hostname.foo.bar.local:65535") verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException] } } diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala index d72ad2d7bcd..fb80c4b890d 100644 --- a/core/src/test/scala/unit/kafka/server/ServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala @@ -20,6 +20,7 @@ import java.util.Properties import org.apache.kafka.common.Uuid import org.apache.kafka.common.metrics.MetricsContext +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -54,7 +55,7 @@ class ServerTest { val props = new Properties() props.put(KafkaConfig.BrokerIdProp, brokerId.toString) - props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:0") + props.put(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:0") val config = KafkaConfig.fromProps(props) val context = Server.createKafkaMetricsContext(config, clusterId) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 44243d39ce8..33a4b27fd8b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -74,7 +74,7 @@ import org.apache.kafka.metadata.properties.MetaProperties import org.apache.kafka.server.ControllerRequestCompletionHandler import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer} import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} -import org.apache.kafka.server.config.Defaults +import org.apache.kafka.server.config.{Defaults, ZkConfigs} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} @@ -359,8 +359,8 @@ object TestUtils extends Logging { // controllerQuorumVotersFuture instead. props.put(KafkaConfig.QuorumVotersProp, "1000@localhost:0") } else { - props.put(KafkaConfig.ZkConnectProp, zkConnect) - props.put(KafkaConfig.ZkConnectionTimeoutMsProp, "10000") + props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) + props.put(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "10000") } props.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "1500") props.put(KafkaConfig.ControllerSocketTimeoutMsProp, "1500") diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index a1a45784ad7..aa97f6831c3 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigType, ZkConfigs} import org.apache.kafka.storage.internals.log.LogConfig import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException} import org.apache.zookeeper.{CreateMode, ZooDefs} @@ -106,7 +106,7 @@ class KafkaZkClientTest extends QuorumTestHarness { // TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support // to kafka.zk.EmbeddedZookeeper val clientConfig = new ZKClientConfig() - val propKey = KafkaConfig.ZkClientCnxnSocketProp + val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty" KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal) val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout, diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala index 569cb5764b4..b4dccaecb0a 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala @@ -21,6 +21,7 @@ import kafka.zk.ZkMigrationClient import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.security.PasswordEncoder +import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{BeforeEach, TestInfo} import java.util.Properties @@ -40,7 +41,7 @@ class ZkMigrationTestHarness extends QuorumTestHarness { val encoder: PasswordEncoder = { val encoderProps = new Properties() - encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation + encoderProps.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:1234") // Get around the config validation encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the val encoderConfig = new KafkaConfig(encoderProps) PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get, diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index ebb6ccf4453..affe6c3839c 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -27,6 +27,7 @@ import kafka.utils.TestUtils import kafka.server.QuorumTestHarness import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.zookeeper.KeeperException.{Code, NoNodeException} import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} @@ -102,7 +103,7 @@ class ZooKeeperClientTest extends QuorumTestHarness { // TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support // to kafka.zk.EmbeddedZookeeper val clientConfig = new ZKClientConfig() - val propKey = KafkaConfig.ZkClientCnxnSocketProp + val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty" KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal) val client = newZooKeeperClient(clientConfig = clientConfig) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 75fcf3f24b6..f4d17ed0740 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupCoordinator; import org.apache.kafka.server.common.Features; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZkConfigs; import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -176,7 +177,7 @@ public class MetadataRequestBenchmark { private KafkaApis createKafkaApis() { Properties kafkaProps = new Properties(); - kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk"); + kafkaProps.put(ZkConfigs.ZK_CONNECT_PROP, "zk"); kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + ""); KafkaConfig config = new KafkaConfig(kafkaProps); return new KafkaApisBuilder(). diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java index 0c7ca123b87..5b425c0dc49 100644 --- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java +++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java @@ -44,16 +44,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Defaults { - /** ********* Zookeeper Configuration *********/ - public static final int ZK_SESSION_TIMEOUT_MS = 18000; - public static final boolean ZK_ENABLE_SECURE_ACLS = false; - public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; - public static final boolean ZK_SSL_CLIENT_ENABLE = false; - public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; - public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; - public static final boolean ZK_SSL_CRL_ENABLE = false; - public static final boolean ZK_SSL_OCSP_ENABLE = false; - /** ********* General Configuration *********/ public static final boolean BROKER_ID_GENERATION_ENABLE = true; public static final int MAX_RESERVED_BROKER_ID = 1000; diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java new file mode 100644 index 00000000000..2ddd2ef2145 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public final class ZkConfigs { + /** ********* Zookeeper Configuration ***********/ + public static final String ZK_CONNECT_PROP = "zookeeper.connect"; + public static final String ZK_SESSION_TIMEOUT_MS_PROP = "zookeeper.session.timeout.ms"; + public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = "zookeeper.connection.timeout.ms"; + public static final String ZK_ENABLE_SECURE_ACLS_PROP = "zookeeper.set.acl"; + public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = "zookeeper.max.in.flight.requests"; + public static final String ZK_SSL_CLIENT_ENABLE_PROP = "zookeeper.ssl.client.enable"; + public static final String ZK_CLIENT_CNXN_SOCKET_PROP = "zookeeper.clientCnxnSocket"; + public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = "zookeeper.ssl.keystore.location"; + public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = "zookeeper.ssl.keystore.password"; + public static final String ZK_SSL_KEY_STORE_TYPE_PROP = "zookeeper.ssl.keystore.type"; + public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = "zookeeper.ssl.truststore.location"; + public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = "zookeeper.ssl.truststore.password"; + public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = "zookeeper.ssl.truststore.type"; + public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol"; + public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = "zookeeper.ssl.enabled.protocols"; + public static final String ZK_SSL_CIPHER_SUITES_PROP = "zookeeper.ssl.cipher.suites"; + public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = "zookeeper.ssl.endpoint.identification.algorithm"; + public static final String ZK_SSL_CRL_ENABLE_PROP = "zookeeper.ssl.crl.enable"; + public static final String ZK_SSL_OCSP_ENABLE_PROP = "zookeeper.ssl.ocsp.enable"; + + public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " + + "host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " + + "down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" + + "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + + "For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>."; + public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout"; + public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used"; + public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs"; + public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."; + public static final String ZK_SSL_CLIENT_ENABLE_DOC; + public static final String ZK_CLIENT_CNXN_SOCKET_DOC; + public static final String ZK_SSL_KEY_STORE_LOCATION_DOC; + public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC; + public static final String ZK_SSL_KEY_STORE_TYPE_DOC; + public static final String ZK_SSL_TRUST_STORE_LOCATION_DOC; + public static final String ZK_SSL_TRUST_STORE_PASSWORD_DOC; + public static final String ZK_SSL_TRUST_STORE_TYPE_DOC; + public static final String ZK_SSL_PROTOCOL_DOC; + public static final String ZK_SSL_ENABLED_PROTOCOLS_DOC; + public static final String ZK_SSL_CIPHER_SUITES_DOC; + public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC; + public static final String ZK_SSL_CRL_ENABLE_DOC; + public static final String ZK_SSL_OCSP_ENABLE_DOC; + + // a map from the Kafka config to the corresponding ZooKeeper Java system property + public static final Map<String, String> ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP; + + public static final int ZK_SESSION_TIMEOUT_MS = 18000; + public static final boolean ZK_ENABLE_SECURE_ACLS = false; + public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; + public static final boolean ZK_SSL_CLIENT_ENABLE = false; + public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; + public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; + public static final boolean ZK_SSL_CRL_ENABLE = false; + public static final boolean ZK_SSL_OCSP_ENABLE = false; + + // See ZKClientConfig.SECURE_CLIENT + private static final String SECURE_CLIENT = "zookeeper.client.secure"; + // See ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET + private static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket"; + + static { + Map<String, String> zkSslConfigToSystemPropertyMap = new HashMap<>(); + + zkSslConfigToSystemPropertyMap.put(ZK_SSL_CLIENT_ENABLE_PROP, SECURE_CLIENT); + zkSslConfigToSystemPropertyMap.put(ZK_CLIENT_CNXN_SOCKET_PROP, ZOOKEEPER_CLIENT_CNXN_SOCKET); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_LOCATION_PROP, "zookeeper.ssl.keyStore.location"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_PASSWORD_PROP, "zookeeper.ssl.keyStore.password"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_TYPE_PROP, "zookeeper.ssl.keyStore.type"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_LOCATION_PROP, "zookeeper.ssl.trustStore.location"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_PASSWORD_PROP, "zookeeper.ssl.trustStore.password"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_TYPE_PROP, "zookeeper.ssl.trustStore.type"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENABLED_PROTOCOLS_PROP, "zookeeper.ssl.enabledProtocols"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_CIPHER_SUITES_PROP, "zookeeper.ssl.ciphersuites"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, "zookeeper.ssl.hostnameVerification"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl"); + zkSslConfigToSystemPropertyMap.put(ZK_SSL_OCSP_ENABLE_PROP, "zookeeper.ssl.ocsp"); + + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Collections.unmodifiableMap(zkSslConfigToSystemPropertyMap); + + ZK_SSL_CLIENT_ENABLE_DOC = "Set client to use TLS when connecting to ZooKeeper." + + " An explicit value overrides any value set via the <code>zookeeper.client.secure</code> system property (note the different name)." + + " Defaults to false if neither is set; when true, <code>" + ZK_CLIENT_CNXN_SOCKET_PROP + "</code> must be set (typically to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set may include " + + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.keySet().stream().filter(x -> !x.equals(ZK_SSL_CLIENT_ENABLE_PROP) && !x.equals(ZK_CLIENT_CNXN_SOCKET_PROP)).sorted().collect(Collectors.joining("<code>", "</code>, <code>", "</code>")); + ZK_CLIENT_CNXN_SOCKET_DOC = "Typically set to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the same-named <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_CLIENT_CNXN_SOCKET_PROP) + "</code> system property."; + ZK_SSL_KEY_STORE_LOCATION_DOC = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_LOCATION_PROP) + "</code> system property (note the camelCase)."; + ZK_SSL_KEY_STORE_PASSWORD_DOC = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_PASSWORD_PROP) + "</code> system property (note the camelCase)." + + " Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail."; + ZK_SSL_KEY_STORE_TYPE_DOC = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_TYPE_PROP) + "</code> system property (note the camelCase)." + + " The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the keystore."; + ZK_SSL_TRUST_STORE_LOCATION_DOC = "Truststore location when using TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_LOCATION_PROP) + "</code> system property (note the camelCase)."; + ZK_SSL_TRUST_STORE_PASSWORD_DOC = "Truststore password when using TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_PASSWORD_PROP) + "</code> system property (note the camelCase)."; + ZK_SSL_TRUST_STORE_TYPE_DOC = "Truststore type when using TLS connectivity to ZooKeeper." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_TYPE_PROP) + "</code> system property (note the camelCase)." + + " The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the truststore."; + ZK_SSL_PROTOCOL_DOC = "Specifies the protocol to be used in ZooKeeper TLS negotiation." + + " An explicit value overrides any value set via the same-named <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_PROTOCOL_PROP) + "</code> system property."; + ZK_SSL_ENABLED_PROTOCOLS_DOC = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENABLED_PROTOCOLS_PROP) + "</code> system property (note the camelCase)." + + " The default value of <code>null</code> means the enabled protocol will be the value of the <code>" + ZK_SSL_PROTOCOL_PROP + "</code> configuration property."; + ZK_SSL_CIPHER_SUITES_DOC = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CIPHER_SUITES_PROP) + "</code> system property (note the single word \"ciphersuites\")." + + " The default value of <code>null</code> means the list of enabled cipher suites is determined by the Java runtime being used."; + ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." + + " An explicit value overrides any \"true\" or \"false\" value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP) + "</code> system property (note the different name and values; true implies https and false implies blank)."; + ZK_SSL_CRL_ENABLE_DOC = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CRL_ENABLE_PROP) + "</code> system property (note the shorter name)."; + ZK_SSL_OCSP_ENABLE_DOC = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." + + " Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_OCSP_ENABLE_PROP) + "</code> system property (note the shorter name)."; + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 4232e1d74c9..89af1d5729e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.server.config.ConfigType; +import org.apache.kafka.server.config.ZkConfigs; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.test.TestCondition; @@ -109,7 +110,7 @@ public class EmbeddedKafkaCluster { zookeeper = new EmbeddedZookeeper(); log.debug("ZooKeeper instance is running at {}", zKConnectString()); - brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString()); + brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString()); putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT); putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true); putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 7f945ffe06d..4b57d576ae1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.config.ZkConfigs; import org.apache.kafka.server.util.MockTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,7 @@ public class KafkaEmbedded { effectiveConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), true); effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000); effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true); - effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000); + effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, 10000); effectiveConfig.putAll(initialConfig); effectiveConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());