This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f38fe5e0a6 KAFKA-14588 ZK configuration moved to ZkConfig (#15075)
6f38fe5e0a6 is described below

commit 6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45
Author: Nikolay <nizhi...@apache.org>
AuthorDate: Wed Mar 27 17:37:01 2024 +0300

    KAFKA-14588 ZK configuration moved to ZkConfig (#15075)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 build.gradle                                       |   5 +-
 checkstyle/import-control.xml                      |   1 +
 .../util/clusters/EmbeddedKafkaCluster.java        |   3 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   |   8 +-
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   4 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala     |   7 +-
 .../kafka/security/authorizer/AclAuthorizer.scala  |   7 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 179 ++++++---------------
 core/src/main/scala/kafka/server/KafkaServer.scala |  30 ++--
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   8 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   3 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |   7 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |   3 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   4 +-
 .../kafka/api/SaslMultiMechanismConsumerTest.scala |   4 +-
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |   4 +-
 .../kafka/api/SaslSslAdminIntegrationTest.scala    |   3 +-
 .../kafka/api/SaslSslConsumerTest.scala            |   4 +-
 .../kafka/api/SslAdminIntegrationTest.scala        |   5 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   4 +-
 .../server/KafkaServerKRaftRegistrationTest.scala  |   5 +-
 ...ListenersWithSameSecurityProtocolBaseTest.scala |   3 +-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  16 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala    |  29 ++--
 .../controller/ControllerChannelManagerTest.scala  |   3 +-
 .../kafka/security/authorizer/AuthorizerTest.scala | 127 +++++++--------
 .../kafka/server/DynamicBrokerConfigTest.scala     |   6 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  80 ++++-----
 .../scala/unit/kafka/server/KafkaServerTest.scala  |  51 +++---
 .../unit/kafka/server/ServerShutdownTest.scala     |   5 +-
 .../test/scala/unit/kafka/server/ServerTest.scala  |   3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   6 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |   4 +-
 .../zk/migration/ZkMigrationTestHarness.scala      |   3 +-
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala |   3 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   3 +-
 .../org/apache/kafka/server/config/Defaults.java   |  10 --
 .../org/apache/kafka/server/config/ZkConfigs.java  | 145 +++++++++++++++++
 .../integration/utils/EmbeddedKafkaCluster.java    |   3 +-
 .../streams/integration/utils/KafkaEmbedded.java   |   3 +-
 40 files changed, 439 insertions(+), 362 deletions(-)

diff --git a/build.gradle b/build.gradle
index 489473c885a..0564d27a013 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2160,6 +2160,7 @@ project(':streams') {
     testImplementation project(':storage')
     testImplementation project(':server-common')
     testImplementation project(':server-common').sourceSets.test.output
+    testImplementation project(':server')
     testImplementation libs.log4j
     testImplementation libs.junitJupiter
     testImplementation libs.junitVintageEngine
@@ -2742,6 +2743,7 @@ project(':jmh-benchmarks') {
       exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
     }
     implementation project(':server-common')
+    implementation project(':server')
     implementation project(':clients')
     implementation project(':group-coordinator')
     implementation project(':metadata')
@@ -2899,7 +2901,7 @@ project(':connect:json') {
     api libs.jacksonAfterburner
 
     implementation libs.slf4jApi
-    
+
     testImplementation libs.junitJupiter
 
     testRuntimeOnly libs.slf4jlog4j
@@ -2969,6 +2971,7 @@ project(':connect:runtime') {
     testImplementation project(':metadata')
     testImplementation project(':core').sourceSets.test.output
     testImplementation project(':server-common')
+    testImplementation project(':server')
     testImplementation project(':storage')
     testImplementation project(':connect:test-plugins')
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8bbf5728212..616e0e519c6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -605,6 +605,7 @@
         <allow pkg="org.apache.kafka.metadata" />
         <allow pkg="org.eclipse.jetty.client"/>
         <allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
+        <allow class="org.apache.kafka.server.config.ZkConfigs" />
       </subpackage>
     </subpackage>
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index c15aa27ae59..8a06c8e0b0a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.config.ZkConfigs;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,7 +155,7 @@ public class EmbeddedKafkaCluster {
     }
 
     private void doStart() {
-        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString());
 
         putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 2e867d76dd2..7120d0d8c34 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -21,7 +21,6 @@ import java.util.Properties
 import joptsimple._
 import joptsimple.util.EnumConverter
 import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.acl._
@@ -33,6 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
 import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
@@ -200,7 +200,7 @@ object AclCommand extends Logging {
       // We will default the value of zookeeper.set.acl to true or false based 
on whether SASL is configured,
       // but if SASL is not configured and zookeeper.set.acl is supposed to be 
true due to mutual certificate authentication
       // then it will be up to the user to explicitly specify 
zookeeper.set.acl=true in the authorizer-properties.
-      val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> 
JaasUtils.isZkSaslEnabled)
+      val defaultProps = Map(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP -> 
JaasUtils.isZkSaslEnabled)
       val authorizerPropertiesWithoutTls =
         if (opts.options.has(opts.authorizerPropertiesOpt)) {
           val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt)
@@ -211,7 +211,7 @@ object AclCommand extends Logging {
       val authorizerProperties =
         if (opts.options.has(opts.zkTlsConfigFile)) {
           // load in TLS configs both with and without the "authorizer." prefix
-          val validKeys = 
(KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList ++ 
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.map("authorizer." + 
_).toList).asJava
+          val validKeys = 
(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList ++ 
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.map("authorizer." + 
_).toList).asJava
           authorizerPropertiesWithoutTls ++ 
Utils.loadProps(opts.options.valueOf(opts.zkTlsConfigFile), 
validKeys).asInstanceOf[java.util.Map[String, Any]].asScala
         }
         else
@@ -619,7 +619,7 @@ object AclCommand extends Logging {
       "DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity 
properties are defined for" +
         " the default authorizer kafka.security.authorizer.AclAuthorizer." +
         " Any properties other than the following (with or without an 
\"authorizer.\" prefix) are ignored: " +
-        
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") +
+        
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(",
 ") +
         ". Note that if SASL is not configured and zookeeper.set.acl is 
supposed to be true due to mutual certificate authentication being used" +
         " then it is necessary to explicitly specify --authorizer-properties 
zookeeper.set.acl=true. " +
         AclCommand.AuthorizerDeprecationMessage)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index d03d7dfb9b5..a10722f000f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity,
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, 
ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
-import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
+import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals, 
ZkConfigs}
 import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
@@ -864,7 +864,7 @@ object ConfigCommand extends Logging {
       .ofType(classOf[String])
     val zkTlsConfigFile: OptionSpec[String] = 
parser.accepts("zk-tls-config-file",
       "Identifies the file where ZooKeeper client TLS connectivity properties 
are defined.  Any properties other than " +
-        
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + 
" are ignored.")
+        
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(",
 ") + " are ignored.")
       .withRequiredArg().describedAs("ZooKeeper TLS 
configuration").ofType(classOf[String])
     options = parser.parse(args : _*)
 
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 9c81bb23e2e..dd07f522e77 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -24,6 +24,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, 
ZkSecurityMigratorUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
 import org.apache.zookeeper.KeeperException
@@ -81,7 +82,7 @@ object ZkSecurityMigrator extends Logging {
     if (jaasFile == null && !tlsClientAuthEnabled) {
       val errorMsg = s"No JAAS configuration file has been specified and no 
TLS client certificate has been specified. Please make sure that you set " +
         s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide 
a ZooKeeper client TLS configuration via --$tlsConfigFileOption <filename> " +
-        s"identifying at least ${KafkaConfig.ZkSslClientEnableProp}, 
${KafkaConfig.ZkClientCnxnSocketProp}, and 
${KafkaConfig.ZkSslKeyStoreLocationProp}"
+        s"identifying at least ${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, 
${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and 
${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP}"
       System.err.println("ERROR: %s".format(errorMsg))
       throw new IllegalArgumentException("Incorrect configuration")
     }
@@ -124,7 +125,7 @@ object ZkSecurityMigrator extends Logging {
   }
 
   def createZkClientConfigFromFile(filename: String) : ZKClientConfig = {
-    val zkTlsConfigFileProps = Utils.loadProps(filename, 
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.asJava)
+    val zkTlsConfigFileProps = Utils.loadProps(filename, 
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.asJava)
     val zkClientConfig = new ZKClientConfig() // Initializes based on any 
system properties that have been set
     // Now override any set system properties with explicitly-provided values 
from the config file
     // Emit INFO logs due to camel-case property names encouraging mistakes -- 
help people see mistakes they make
@@ -156,7 +157,7 @@ object ZkSecurityMigrator extends Logging {
       "before migration. If not, exit the command.")
     val zkTlsConfigFile: OptionSpec[String] = 
parser.accepts(tlsConfigFileOption,
       "Identifies the file where ZooKeeper client TLS connectivity properties 
are defined.  Any properties other than " +
-        KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.mkString(", ") + " are 
ignored.")
+        
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.mkString(", ") + " 
are ignored.")
       .withRequiredArg().describedAs("ZooKeeper TLS 
configuration").ofType(classOf[String])
     options = parser.parse(args : _*)
   }
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala 
b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 99642b33c2b..dc307ca702f 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import 
org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.zookeeper.client.ZKClientConfig
 
 import scala.annotation.nowarn
@@ -96,7 +97,7 @@ object AclAuthorizer {
   }
 
   private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: 
KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
-    val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + 
KafkaConfig.ZkSslClientEnableProp).
+    val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + 
ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).
       
map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
     if (!zkSslClientEnable)
       new ZKClientConfig
@@ -105,10 +106,10 @@ object AclAuthorizer {
       // be sure to force creation since the zkSslClientEnable property in the 
kafkaConfig could be false
       val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = 
true)
       // add in any prefixed overlays
-      KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, 
sysProp) =>
+      ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { 
(kafkaProp, sysProp) =>
         configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { 
prefixedValue =>
           zkClientConfig.setProperty(sysProp,
-            if (kafkaProp == 
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
+            if (kafkaProp == 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP)
               (prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
             else
               prefixedValue.toString.trim)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4290ce1da8d..cec5f5649f6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{MetadataVersion, 
MetadataVersionValidator}
 import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms, 
ZkConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.Csv
@@ -67,53 +67,15 @@ object KafkaConfig {
       DynamicBrokerConfig.dynamicConfigUpdateModes))
   }
 
-  /** ********* Zookeeper Configuration ***********/
-  val ZkConnectProp = "zookeeper.connect"
-  val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
-  val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
-  val ZkEnableSecureAclsProp = "zookeeper.set.acl"
-  val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
-  val ZkSslClientEnableProp = "zookeeper.ssl.client.enable"
-  val ZkClientCnxnSocketProp = "zookeeper.clientCnxnSocket"
-  val ZkSslKeyStoreLocationProp = "zookeeper.ssl.keystore.location"
-  val ZkSslKeyStorePasswordProp = "zookeeper.ssl.keystore.password"
-  val ZkSslKeyStoreTypeProp = "zookeeper.ssl.keystore.type"
-  val ZkSslTrustStoreLocationProp = "zookeeper.ssl.truststore.location"
-  val ZkSslTrustStorePasswordProp = "zookeeper.ssl.truststore.password"
-  val ZkSslTrustStoreTypeProp = "zookeeper.ssl.truststore.type"
-  val ZkSslProtocolProp = "zookeeper.ssl.protocol"
-  val ZkSslEnabledProtocolsProp = "zookeeper.ssl.enabled.protocols"
-  val ZkSslCipherSuitesProp = "zookeeper.ssl.cipher.suites"
-  val ZkSslEndpointIdentificationAlgorithmProp = 
"zookeeper.ssl.endpoint.identification.algorithm"
-  val ZkSslCrlEnableProp = "zookeeper.ssl.crl.enable"
-  val ZkSslOcspEnableProp = "zookeeper.ssl.ocsp.enable"
-
-  // a map from the Kafka config to the corresponding ZooKeeper Java system 
property
-  private[kafka] val ZkSslConfigToSystemPropertyMap: Map[String, String] = Map(
-    ZkSslClientEnableProp -> ZKClientConfig.SECURE_CLIENT,
-    ZkClientCnxnSocketProp -> ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
-    ZkSslKeyStoreLocationProp -> "zookeeper.ssl.keyStore.location",
-    ZkSslKeyStorePasswordProp -> "zookeeper.ssl.keyStore.password",
-    ZkSslKeyStoreTypeProp -> "zookeeper.ssl.keyStore.type",
-    ZkSslTrustStoreLocationProp -> "zookeeper.ssl.trustStore.location",
-    ZkSslTrustStorePasswordProp -> "zookeeper.ssl.trustStore.password",
-    ZkSslTrustStoreTypeProp -> "zookeeper.ssl.trustStore.type",
-    ZkSslProtocolProp -> "zookeeper.ssl.protocol",
-    ZkSslEnabledProtocolsProp -> "zookeeper.ssl.enabledProtocols",
-    ZkSslCipherSuitesProp -> "zookeeper.ssl.ciphersuites",
-    ZkSslEndpointIdentificationAlgorithmProp -> 
"zookeeper.ssl.hostnameVerification",
-    ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
-    ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")
-
   private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, 
kafkaPropName: String): Option[String] = {
-    
Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
+    
Option(clientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName)))
   }
 
   private[kafka] def setZooKeeperClientProperty(clientConfig: ZKClientConfig, 
kafkaPropName: String, kafkaPropValue: Any): Unit = {
-    clientConfig.setProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName),
+    
clientConfig.setProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName),
       kafkaPropName match {
-        case ZkSslEndpointIdentificationAlgorithmProp => 
(kafkaPropValue.toString.toUpperCase == "HTTPS").toString
-        case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => 
kafkaPropValue match {
+        case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => 
(kafkaPropValue.toString.toUpperCase == "HTTPS").toString
+        case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | 
ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => kafkaPropValue match {
           case list: java.util.List[_] => list.asScala.mkString(",")
           case _ => kafkaPropValue.toString
         }
@@ -124,9 +86,9 @@ object KafkaConfig {
   // For ZooKeeper TLS client authentication to be enabled the client must (at 
a minimum) configure itself as using TLS
   // with both a client connection socket and a key store location explicitly 
set.
   private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): 
Boolean = {
-    zooKeeperClientProperty(zkClientConfig, 
ZkSslClientEnableProp).contains("true") &&
-      zooKeeperClientProperty(zkClientConfig, 
ZkClientCnxnSocketProp).isDefined &&
-      zooKeeperClientProperty(zkClientConfig, 
ZkSslKeyStoreLocationProp).isDefined
+    zooKeeperClientProperty(zkClientConfig, 
ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).contains("true") &&
+      zooKeeperClientProperty(zkClientConfig, 
ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP).isDefined &&
+      zooKeeperClientProperty(zkClientConfig, 
ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP).isDefined
   }
 
   /** ********* General Configuration ***********/
@@ -439,51 +401,6 @@ object KafkaConfig {
   val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"
 
   /* Documentation */
-  /** ********* Zookeeper Configuration ***********/
-  val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form 
<code>hostname:port</code> where host and port are the " +
-  "host and port of a ZooKeeper server. To allow connecting through other 
ZooKeeper nodes when that ZooKeeper machine is " +
-  "down you can also specify multiple hosts in the form 
<code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
-  "The server can also have a ZooKeeper chroot path as part of its ZooKeeper 
connection string which puts its data under some path in the global ZooKeeper 
namespace. " +
-  "For example to give a chroot path of <code>/chroot/path</code> you would 
give the connection string as 
<code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>."
-  val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
-  val ZkConnectionTimeoutMsDoc = "The max time that the client waits to 
establish a connection to ZooKeeper. If not set, the value in " + 
ZkSessionTimeoutMsProp + " is used"
-  val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
-  val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged 
requests the client will send to ZooKeeper before blocking."
-  val ZkSslClientEnableDoc = "Set client to use TLS when connecting to 
ZooKeeper." +
-    " An explicit value overrides any value set via the 
<code>zookeeper.client.secure</code> system property (note the different 
name)." +
-    s" Defaults to false if neither is set; when true, 
<code>$ZkClientCnxnSocketProp</code> must be set (typically to 
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set 
may include " +
-    ZkSslConfigToSystemPropertyMap.keys.toList.filter(x => x != 
ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).sorted.mkString("<code>", 
"</code>, <code>", "</code>")
-  val ZkClientCnxnSocketDoc = "Typically set to 
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS 
connectivity to ZooKeeper." +
-    s" Overrides any explicit value set via the same-named 
<code>${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)}</code> system 
property."
-  val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side 
certificate with TLS connectivity to ZooKeeper." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreLocationProp)}</code> 
system property (note the camelCase)."
-  val ZkSslKeyStorePasswordDoc = "Keystore password when using a client-side 
certificate with TLS connectivity to ZooKeeper." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStorePasswordProp)}</code> 
system property (note the camelCase)." +
-    " Note that ZooKeeper does not support a key password different from the 
keystore password, so be sure to set the key password in the keystore to be 
identical to the keystore password; otherwise the connection attempt to 
Zookeeper will fail."
-  val ZkSslKeyStoreTypeDoc = "Keystore type when using a client-side 
certificate with TLS connectivity to ZooKeeper." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreTypeProp)}</code> system 
property (note the camelCase)." +
-    " The default value of <code>null</code> means the type will be 
auto-detected based on the filename extension of the keystore."
-  val ZkSslTrustStoreLocationDoc = "Truststore location when using TLS 
connectivity to ZooKeeper." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreLocationProp)}</code> 
system property (note the camelCase)."
-  val ZkSslTrustStorePasswordDoc = "Truststore password when using TLS 
connectivity to ZooKeeper." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStorePasswordProp)}</code> 
system property (note the camelCase)."
-  val ZkSslTrustStoreTypeDoc = "Truststore type when using TLS connectivity to 
ZooKeeper." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreTypeProp)}</code> system 
property (note the camelCase)." +
-    " The default value of <code>null</code> means the type will be 
auto-detected based on the filename extension of the truststore."
-  val ZkSslProtocolDoc = "Specifies the protocol to be used in ZooKeeper TLS 
negotiation." +
-    s" An explicit value overrides any value set via the same-named 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslProtocolProp)}</code> system 
property."
-  val ZkSslEnabledProtocolsDoc = "Specifies the enabled protocol(s) in 
ZooKeeper TLS negotiation (csv)." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslEnabledProtocolsProp)}</code> 
system property (note the camelCase)." +
-    s" The default value of <code>null</code> means the enabled protocol will 
be the value of the <code>${KafkaConfig.ZkSslProtocolProp}</code> configuration 
property."
-  val ZkSslCipherSuitesDoc = "Specifies the enabled cipher suites to be used 
in ZooKeeper TLS negotiation (csv)." +
-    s""" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslCipherSuitesProp)}</code> system 
property (note the single word \"ciphersuites\").""" +
-    " The default value of <code>null</code> means the list of enabled cipher 
suites is determined by the Java runtime being used."
-  val ZkSslEndpointIdentificationAlgorithmDoc = "Specifies whether to enable 
hostname verification in the ZooKeeper TLS negotiation process, with 
(case-insensitively) \"https\" meaning ZooKeeper hostname verification is 
enabled and an explicit blank value meaning it is disabled (disabling it is 
only recommended for testing purposes)." +
-    s""" An explicit value overrides any \"true\" or \"false\" value set via 
the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslEndpointIdentificationAlgorithmProp)}</code>
 system property (note the different name and values; true implies https and 
false implies blank)."""
-  val ZkSslCrlEnableDoc = "Specifies whether to enable Certificate Revocation 
List in the ZooKeeper TLS protocols." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslCrlEnableProp)}</code> system 
property (note the shorter name)."
-  val ZkSslOcspEnableDoc = "Specifies whether to enable Online Certificate 
Status Protocol in the ZooKeeper TLS protocols." +
-    s" Overrides any explicit value set via the 
<code>${ZkSslConfigToSystemPropertyMap(ZkSslOcspEnableProp)}</code> system 
property (note the shorter name)."
   /** ********* General Configuration ***********/
   val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on 
the server. When enabled the value configured for $MaxReservedBrokerIdProp 
should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@@ -943,25 +860,25 @@ object KafkaConfig {
     new ConfigDef()
 
       /** ********* Zookeeper Configuration ***********/
-      .define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc)
-      .define(ZkSessionTimeoutMsProp, INT, Defaults.ZK_SESSION_TIMEOUT_MS, 
HIGH, ZkSessionTimeoutMsDoc)
-      .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, 
ZkConnectionTimeoutMsDoc)
-      .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZK_ENABLE_SECURE_ACLS, 
HIGH, ZkEnableSecureAclsDoc)
-      .define(ZkMaxInFlightRequestsProp, INT, 
Defaults.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
-      .define(ZkSslClientEnableProp, BOOLEAN, Defaults.ZK_SSL_CLIENT_ENABLE, 
MEDIUM, ZkSslClientEnableDoc)
-      .define(ZkClientCnxnSocketProp, STRING, null, MEDIUM, 
ZkClientCnxnSocketDoc)
-      .define(ZkSslKeyStoreLocationProp, STRING, null, MEDIUM, 
ZkSslKeyStoreLocationDoc)
-      .define(ZkSslKeyStorePasswordProp, PASSWORD, null, MEDIUM, 
ZkSslKeyStorePasswordDoc)
-      .define(ZkSslKeyStoreTypeProp, STRING, null, MEDIUM, 
ZkSslKeyStoreTypeDoc)
-      .define(ZkSslTrustStoreLocationProp, STRING, null, MEDIUM, 
ZkSslTrustStoreLocationDoc)
-      .define(ZkSslTrustStorePasswordProp, PASSWORD, null, MEDIUM, 
ZkSslTrustStorePasswordDoc)
-      .define(ZkSslTrustStoreTypeProp, STRING, null, MEDIUM, 
ZkSslTrustStoreTypeDoc)
-      .define(ZkSslProtocolProp, STRING, Defaults.ZK_SSL_PROTOCOL, LOW, 
ZkSslProtocolDoc)
-      .define(ZkSslEnabledProtocolsProp, LIST, null, LOW, 
ZkSslEnabledProtocolsDoc)
-      .define(ZkSslCipherSuitesProp, LIST, null, LOW, ZkSslCipherSuitesDoc)
-      .define(ZkSslEndpointIdentificationAlgorithmProp, STRING, 
Defaults.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, 
ZkSslEndpointIdentificationAlgorithmDoc)
-      .define(ZkSslCrlEnableProp, BOOLEAN, Defaults.ZK_SSL_CRL_ENABLE, LOW, 
ZkSslCrlEnableDoc)
-      .define(ZkSslOcspEnableProp, BOOLEAN, Defaults.ZK_SSL_OCSP_ENABLE, LOW, 
ZkSslOcspEnableDoc)
+      .define(ZkConfigs.ZK_CONNECT_PROP, STRING, null, HIGH, 
ZkConfigs.ZK_CONNECT_DOC)
+      .define(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, INT, 
ZkConfigs.ZK_SESSION_TIMEOUT_MS, HIGH, ZkConfigs.ZK_SESSION_TIMEOUT_MS_DOC)
+      .define(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, INT, null, HIGH, 
ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_DOC)
+      .define(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, BOOLEAN, 
ZkConfigs.ZK_ENABLE_SECURE_ACLS, HIGH, ZkConfigs.ZK_ENABLE_SECURE_ACLS_DOC)
+      .define(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP, INT, 
ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, 
ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
+      .define(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, BOOLEAN, 
ZkConfigs.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkConfigs.ZK_SSL_CLIENT_ENABLE_DOC)
+      .define(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, STRING, null, MEDIUM, 
ZkConfigs.ZK_CLIENT_CNXN_SOCKET_DOC)
+      .define(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, STRING, null, MEDIUM, 
ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_DOC)
+      .define(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, PASSWORD, null, 
MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_DOC)
+      .define(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, STRING, null, MEDIUM, 
ZkConfigs.ZK_SSL_KEY_STORE_TYPE_DOC)
+      .define(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, STRING, null, 
MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_DOC)
+      .define(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, PASSWORD, null, 
MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_DOC)
+      .define(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, STRING, null, MEDIUM, 
ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_DOC)
+      .define(ZkConfigs.ZK_SSL_PROTOCOL_PROP, STRING, 
ZkConfigs.ZK_SSL_PROTOCOL, LOW, ZkConfigs.ZK_SSL_PROTOCOL_DOC)
+      .define(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, LIST, null, LOW, 
ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_DOC)
+      .define(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, LIST, null, LOW, 
ZkConfigs.ZK_SSL_CIPHER_SUITES_DOC)
+      .define(ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, STRING, 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
+      .define(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, BOOLEAN, 
ZkConfigs.ZK_SSL_CRL_ENABLE, LOW, ZkConfigs.ZK_SSL_CRL_ENABLE_DOC)
+      .define(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, BOOLEAN, 
ZkConfigs.ZK_SSL_OCSP_ENABLE, LOW, ZkConfigs.ZK_SSL_OCSP_ENABLE_DOC)
 
       /** ********* General Configuration ***********/
       .define(BrokerIdGenerationEnableProp, BOOLEAN, 
Defaults.BROKER_ID_GENERATION_ENABLE, MEDIUM, BrokerIdGenerationEnableDoc)
@@ -1417,12 +1334,12 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     super.valuesWithPrefixOverride(prefix)
 
   /** ********* Zookeeper Configuration ***********/
-  val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
-  val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
+  val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_PROP)
+  val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP)
   val zkConnectionTimeoutMs: Int =
-    
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
-  val zkEnableSecureAcls: Boolean = 
getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
-  val zkMaxInFlightRequests: Int = 
getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
+    
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP))
+  val zkEnableSecureAcls: Boolean = 
getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP)
+  val zkMaxInFlightRequests: Int = 
getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP)
 
   private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
   def remoteLogManagerConfig = _remoteLogManagerConfig
@@ -1470,21 +1387,21 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
-  val zkSslClientEnable = 
zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslClientEnableProp)
-  val zkClientCnxnSocketClassName = 
zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkClientCnxnSocketProp)
-  val zkSslKeyStoreLocation = 
zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreLocationProp)
-  val zkSslKeyStorePassword = 
zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslKeyStorePasswordProp)
-  val zkSslKeyStoreType = 
zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreTypeProp)
-  val zkSslTrustStoreLocation = 
zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreLocationProp)
-  val zkSslTrustStorePassword = 
zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslTrustStorePasswordProp)
-  val zkSslTrustStoreType = 
zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreTypeProp)
-  val ZkSslProtocol = 
zkStringConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslProtocolProp)
-  val ZkSslEnabledProtocols = 
zkListConfigOrSystemProperty(KafkaConfig.ZkSslEnabledProtocolsProp)
-  val ZkSslCipherSuites = 
zkListConfigOrSystemProperty(KafkaConfig.ZkSslCipherSuitesProp)
+  val zkSslClientEnable = 
zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP)
+  val zkClientCnxnSocketClassName = 
zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP)
+  val zkSslKeyStoreLocation = 
zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP)
+  val zkSslKeyStorePassword = 
zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP)
+  val zkSslKeyStoreType = 
zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP)
+  val zkSslTrustStoreLocation = 
zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP)
+  val zkSslTrustStorePassword = 
zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP)
+  val zkSslTrustStoreType = 
zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP)
+  val ZkSslProtocol = 
zkStringConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_PROTOCOL_PROP)
+  val ZkSslEnabledProtocols = 
zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP)
+  val ZkSslCipherSuites = 
zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP)
   val ZkSslEndpointIdentificationAlgorithm = {
     // Use the system property if it exists and the Kafka config value was 
defaulted rather than actually provided
     // Need to translate any system property value from true/false to 
HTTPS/<blank>
-    val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
+    val kafkaProp = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP
     val actuallyProvided = originals.containsKey(kafkaProp)
     if (actuallyProvided)
       getString(kafkaProp)
@@ -1496,8 +1413,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       }
     }
   }
-  val ZkSslCrlEnable = 
zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslCrlEnableProp)
-  val ZkSslOcspEnable = 
zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslOcspEnableProp)
+  val ZkSslCrlEnable = 
zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP)
+  val ZkSslOcspEnable = 
zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP)
   /** ********* General Configuration ***********/
   val brokerIdGenerationEnable: Boolean = 
getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
@@ -2040,7 +1957,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
     if (requiresZookeeper) {
       if (zkConnect == null) {
-        throw new ConfigException(s"Missing required configuration 
`${KafkaConfig.ZkConnectProp}` which has no default value.")
+        throw new ConfigException(s"Missing required configuration 
`${ZkConfigs.ZK_CONNECT_PROP}` which has no default value.")
       }
       if (brokerIdGenerationEnable) {
         require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id 
must be greater than or equal to -1 and not greater than 
reserved.broker.max.id")
@@ -2055,7 +1972,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       }
       if (migrationEnabled) {
         if (zkConnect == null) {
-          throw new ConfigException(s"If using 
`${KafkaConfig.MigrationEnabledProp}` in KRaft mode, 
`${KafkaConfig.ZkConnectProp}` must also be set.")
+          throw new ConfigException(s"If using 
`${KafkaConfig.MigrationEnabledProp}` in KRaft mode, 
`${ZkConfigs.ZK_CONNECT_PROP}` must also be set.")
         }
       }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 55c773b0a46..fce61e17a22 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -57,7 +57,7 @@ import org.apache.kafka.server.NodeToControllerChannelManager
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
 import org.apache.kafka.server.fault.LoggingFaultHandler
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -80,20 +80,20 @@ object KafkaServer {
   def zkClientConfigFromKafkaConfig(config: KafkaConfig, 
forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
     val clientConfig = new ZKClientConfig
     if (config.zkSslClientEnable || forceZkSslClientEnable) {
-      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslClientEnableProp, "true")
-      
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkClientCnxnSocketProp, _))
-      
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkSslKeyStoreLocationProp, _))
-      config.zkSslKeyStorePassword.foreach(x => 
KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslKeyStorePasswordProp, x.value))
-      
config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkSslKeyStoreTypeProp, _))
-      
config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkSslTrustStoreLocationProp, _))
-      config.zkSslTrustStorePassword.foreach(x => 
KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslTrustStorePasswordProp, x.value))
-      
config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkSslTrustStoreTypeProp, _))
-      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslProtocolProp, config.ZkSslProtocol)
-      
config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkSslEnabledProtocolsProp, _))
-      
config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 KafkaConfig.ZkSslCipherSuitesProp, _))
-      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, 
config.ZkSslEndpointIdentificationAlgorithm)
-      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
-      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
+      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "true")
+      
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, _))
+      
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, _))
+      config.zkSslKeyStorePassword.foreach(x => 
KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, x.value))
+      
config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, _))
+      
config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, _))
+      config.zkSslTrustStorePassword.foreach(x => 
KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, x.value))
+      
config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, _))
+      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_PROTOCOL_PROP, config.ZkSslProtocol)
+      
config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, _))
+      
config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
 ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, _))
+      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, 
config.ZkSslEndpointIdentificationAlgorithm)
+      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, config.ZkSslCrlEnable.toString)
+      KafkaConfig.setZooKeeperClientProperty(clientConfig, 
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, config.ZkSslOcspEnable.toString)
     }
     // The zk sasl is enabled by default so it can produce false error when 
broker does not intend to use SASL.
     if (!JaasUtils.isZkSaslEnabled) 
clientConfig.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 26c9453bef2..749de428533 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -34,7 +34,7 @@ import 
org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
@@ -2349,9 +2349,9 @@ object KafkaZkClient {
 
     if (secureAclsEnabled && !isZkSecurityEnabled)
       throw new java.lang.SecurityException(
-        s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client 
TLS configuration identifying at least " +
-          s"${KafkaConfig.ZkSslClientEnableProp}, 
${KafkaConfig.ZkClientCnxnSocketProp}, and " +
-          s"${KafkaConfig.ZkSslKeyStoreLocationProp} was not present and the 
verification of the JAAS login file failed " +
+        s"${ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP} is true, but ZooKeeper 
client TLS configuration identifying at least " +
+          s"${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, 
${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and " +
+          s"${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP} was not present and 
the verification of the JAAS login file failed " +
           s"${JaasUtils.zkSecuritySysConfigString}")
 
     KafkaZkClient(config.zkConnect, secureAclsEnabled, 
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
diff --git 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 6ea0c2eaec0..77d0784b1a5 100644
--- 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
@@ -75,7 +76,7 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
   import DescribeAuthorizedOperationsTest._
 
   override val brokerCount = 1
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
   this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
classOf[AclAuthorizer].getName)
 
   var client: Admin = _
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 46e674c00aa..061ba3de9ea 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -18,9 +18,9 @@
 package kafka.api
 
 import com.yammer.metrics.core.Gauge
+
 import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
-
 import kafka.security.authorizer.AclAuthorizer
 import kafka.security.authorizer.AclEntry.WildcardHost
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
@@ -38,11 +38,12 @@ import org.apache.kafka.common.resource._
 import org.apache.kafka.common.resource.ResourceType._
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth._
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{ValueSource, CsvSource}
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
 import scala.jdk.CollectionConverters._
 
@@ -156,7 +157,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     } else {
       // The next two configuration parameters enable ZooKeeper secure ACLs
       // and sets the Kafka authorizer, both necessary to enable security.
-      this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+      this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, 
"true")
       this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
authorizerClass.getName)
 
       // Set the specific principal that can update ACLs.
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index cb4b5b96979..658b57cddd3 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.{InvalidTopicException, 
UnknownTopicOrPart
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.authenticator.TestJaasConfig
+import org.apache.kafka.server.config.ZkConfigs
 import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -43,7 +44,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
   private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
   private val kafkaServerJaasEntryName =
     
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false")
   this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, 
"false")
   this.serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, 
"2.8")
   this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index e084454f5ff..fe32106877a 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.resource.{PatternType, 
ResourcePattern, ResourceT
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, 
Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
-import org.apache.kafka.server.config.Defaults
+import org.apache.kafka.server.config.{Defaults, ZkConfigs}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
@@ -2707,7 +2707,7 @@ object PlaintextAdminIntegrationTest {
     var topicConfigEntries2 = Seq(new 
ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava
 
     val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
test.brokers.head.config.brokerId.toString)
-    val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, 
"localhost:2181")).asJava
+    val brokerConfigEntries = Seq(new ConfigEntry(ZkConfigs.ZK_CONNECT_PROP, 
"localhost:2181")).asJava
 
     // Alter configs: first and third are invalid, second is valid
     var alterResult = admin.alterConfigs(Map(
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 6fc3fb9b8a1..4cf7a320ff2 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -12,10 +12,10 @@
   */
 package kafka.api
 
-import kafka.server.KafkaConfig
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
 class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
   private val kafkaClientSaslMechanism = "PLAIN"
   private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
   override protected val serverSaslProperties = 
Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism))
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 6a03d51aa36..f039e16faa3 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -13,10 +13,10 @@
 package kafka.api
 
 import java.util.Locale
-import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
 
 @Timeout(600)
@@ -26,7 +26,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest 
with SaslSetup {
   private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
   private val kafkaServerJaasEntryName =
     
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false")
   // disable secure acls of zkClient in QuorumTestHarness
   override protected def zkAclsEnabled = Some(false)
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 29b51713257..1d2f4eb8bbf 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, 
TOPIC}
 import org.apache.kafka.common.resource.{PatternType, Resource, 
ResourcePattern, ResourcePatternFilter, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -46,7 +47,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 
   val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], 
classOf[AclAuthorizer])
 
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 0ea458e25aa..19b6e9e4614 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -12,14 +12,14 @@
   */
 package kafka.api
 
-import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 
 @Timeout(600)
 class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
 
diff --git 
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 4e6ae3f8132..ff6ea03a677 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -14,10 +14,8 @@ package kafka.api
 
 import java.util
 import java.util.concurrent._
-
 import com.yammer.metrics.core.Gauge
 import kafka.security.authorizer.AclAuthorizer
-import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
CreateAclsResult}
 import org.apache.kafka.common.acl._
@@ -25,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
 import org.junit.jupiter.api.{AfterEach, Test}
@@ -80,7 +79,7 @@ object SslAdminIntegrationTest {
 class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
   override val authorizationAdmin = new 
AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], 
classOf[AclAuthorizer])
 
-  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
 
   override protected def securityProtocol = SecurityProtocol.SSL
   override protected lazy val trustStoreFile = 
Some(TestUtils.tempFile("truststore", ".jks"))
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c984eae0272..dd85bd41cb0 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
 import org.apache.kafka.security.PasswordEncoder
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.ShutdownableThread
@@ -122,7 +122,7 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
         properties
       } else {
         val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
-        properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+        properties.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
         properties
       }
       props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
diff --git 
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
 
b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index d4a1e8f1dd1..7bf86259230 100644
--- 
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -25,6 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.Assertions.{assertThrows, fail}
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{Tag, Timeout}
@@ -61,7 +62,7 @@ class KafkaServerKRaftRegistrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -99,7 +100,7 @@ class KafkaServerKRaftRegistrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 4d7ddc15cb3..16e1a290278 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.coordinator.group.OffsetConfig
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
@@ -79,7 +80,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
       props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
         s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
       props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
-      props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+      props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, 
kafkaClientSaslMechanism)
       props.put(s"${new 
ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
         kafkaServerSaslMechanisms(SecureInternal).mkString(","))
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index a3e5ecc0814..ff8d2b797f3 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.security.PasswordEncoder
 import org.apache.kafka.server.ControllerRequestCompletionHandler
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, 
ProducerIdsBlock}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue, fail}
 import org.junit.jupiter.api.{Assumptions, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
@@ -177,7 +177,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -309,7 +309,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -443,7 +443,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -508,7 +508,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -576,7 +576,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -636,7 +636,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
@@ -711,7 +711,7 @@ class ZkMigrationIntegrationTest {
         setNumBrokerNodes(0).
         setNumControllerNodes(1).build())
       .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
-      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
       .build()
     try {
       kraftCluster.format()
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 4cec0d05e5b..3d97e7df171 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -25,6 +25,7 @@ import kafka.utils.TestUtils.assertBadConfigContainingMessage
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
 
@@ -155,7 +156,7 @@ class KafkaTest {
       "Missing required configuration `zookeeper.connect` which has no default 
value.")
 
     // Ensure that no exception is thrown once zookeeper.connect is defined 
(and we clear controller.listener.names)
-    propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     propertiesFile.setProperty(KafkaConfig.ControllerListenerNamesProp, "")
     KafkaConfig.fromProps(propertiesFile)
   }
@@ -232,61 +233,61 @@ class KafkaTest {
 
   @Test
   def testZkSslClientEnable(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslClientEnableProp, 
"zookeeper.ssl.client.enable",
+    testZkConfig(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, 
"zookeeper.ssl.client.enable",
       "zookeeper.client.secure", booleanPropValueToSet, config => 
Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false))
   }
 
   @Test
   def testZkSslKeyStoreLocation(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslKeyStoreLocationProp, 
"zookeeper.ssl.keystore.location",
+    testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, 
"zookeeper.ssl.keystore.location",
       "zookeeper.ssl.keyStore.location", stringPropValueToSet, config => 
config.zkSslKeyStoreLocation, stringPropValueToSet)
   }
 
   @Test
   def testZkSslTrustStoreLocation(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslTrustStoreLocationProp, 
"zookeeper.ssl.truststore.location",
+    testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, 
"zookeeper.ssl.truststore.location",
       "zookeeper.ssl.trustStore.location", stringPropValueToSet, config => 
config.zkSslTrustStoreLocation, stringPropValueToSet)
   }
 
   @Test
   def testZookeeperKeyStorePassword(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslKeyStorePasswordProp, 
"zookeeper.ssl.keystore.password",
+    testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, 
"zookeeper.ssl.keystore.password",
       "zookeeper.ssl.keyStore.password", passwordPropValueToSet, config => 
config.zkSslKeyStorePassword, new Password(passwordPropValueToSet))
   }
 
   @Test
   def testZookeeperTrustStorePassword(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslTrustStorePasswordProp, 
"zookeeper.ssl.truststore.password",
+    testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, 
"zookeeper.ssl.truststore.password",
       "zookeeper.ssl.trustStore.password", passwordPropValueToSet, config => 
config.zkSslTrustStorePassword, new Password(passwordPropValueToSet))
   }
 
   @Test
   def testZkSslKeyStoreType(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslKeyStoreTypeProp, 
"zookeeper.ssl.keystore.type",
+    testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, 
"zookeeper.ssl.keystore.type",
       "zookeeper.ssl.keyStore.type", stringPropValueToSet, config => 
config.zkSslKeyStoreType, stringPropValueToSet)
   }
 
   @Test
   def testZkSslTrustStoreType(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslTrustStoreTypeProp, 
"zookeeper.ssl.truststore.type",
+    testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, 
"zookeeper.ssl.truststore.type",
       "zookeeper.ssl.trustStore.type", stringPropValueToSet, config => 
config.zkSslTrustStoreType, stringPropValueToSet)
   }
 
   @Test
   def testZkSslProtocol(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslProtocolProp, "zookeeper.ssl.protocol",
+    testZkConfig(ZkConfigs.ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol",
       "zookeeper.ssl.protocol", stringPropValueToSet, config => 
Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2"))
   }
 
   @Test
   def testZkSslEnabledProtocols(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslEnabledProtocolsProp, 
"zookeeper.ssl.enabled.protocols",
+    testZkConfig(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, 
"zookeeper.ssl.enabled.protocols",
       "zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","), 
config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava)
   }
 
   @Test
   def testZkSslCipherSuites(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslCipherSuitesProp, 
"zookeeper.ssl.cipher.suites",
+    testZkConfig(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, 
"zookeeper.ssl.cipher.suites",
       "zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config 
=> config.ZkSslCipherSuites, listPropValueToSet.asJava)
   }
 
@@ -294,7 +295,7 @@ class KafkaTest {
   def testZkSslEndpointIdentificationAlgorithm(): Unit = {
     // this property is different than the others
     // because the system property values and the Kafka property values don't 
match
-    val kafkaPropName = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
+    val kafkaPropName = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP
     assertEquals("zookeeper.ssl.endpoint.identification.algorithm", 
kafkaPropName)
     val sysProp = "zookeeper.ssl.hostnameVerification"
     val expectedDefaultValue = "HTTPS"
@@ -327,13 +328,13 @@ class KafkaTest {
 
   @Test
   def testZkSslCrlEnable(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslCrlEnableProp, "zookeeper.ssl.crl.enable",
+    testZkConfig(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl.enable",
       "zookeeper.ssl.crl", booleanPropValueToSet, config => 
Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false))
   }
 
   @Test
   def testZkSslOcspEnable(): Unit = {
-    testZkConfig(KafkaConfig.ZkSslOcspEnableProp, "zookeeper.ssl.ocsp.enable",
+    testZkConfig(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, 
"zookeeper.ssl.ocsp.enable",
       "zookeeper.ssl.ocsp", booleanPropValueToSet, config => 
Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false))
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index d6718e4203c..2730431565e 100644
--- 
a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, 
IBP_0_10_2_IV0, IBP_0_9_0, IBP_1_0_IV0, IBP_2_2_IV0, IBP_2_4_IV0, IBP_2_4_IV1, 
IBP_2_6_IV0, IBP_2_8_IV1, IBP_3_2_IV0, IBP_3_4_IV0}
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -895,7 +896,7 @@ class ControllerChannelManagerTest {
   private def createConfig(interBrokerVersion: MetadataVersion): KafkaConfig = 
{
     val props = new Properties()
     props.put(KafkaConfig.BrokerIdProp, controllerId.toString)
-    props.put(KafkaConfig.ZkConnectProp, "zkConnect")
+    props.put(ZkConfigs.ZK_CONNECT_PROP, "zkConnect")
     TestUtils.setIbpAndMessageFormatVersions(props, interBrokerVersion)
     KafkaConfig.fromProps(props)
   }
diff --git 
a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 80790ab905f..29907f22068 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -41,6 +41,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, 
IBP_2_0_IV1}
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.zookeeper.client.ZKClientConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -869,7 +870,7 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(noTlsProps),
       noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { 
propName =>
       assertNull(zkClientConfig.getProperty(propName))
     }
   }
@@ -879,27 +880,27 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
     val props = new java.util.Properties()
     val kafkaValue = "kafkaValue"
     val configs = Map("zookeeper.connect" -> "somewhere", // required, 
otherwise we would omit it
-      KafkaConfig.ZkSslClientEnableProp -> "true",
-      KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
-      KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
-      KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue)
+      ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
+      ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue)
     configs.foreach { case (key, value) => props.put(key, value) }
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
     // confirm we get all the values we expect
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match 
{
-      case KafkaConfig.ZkSslClientEnableProp | 
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
+    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop 
=> prop match {
+      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
         assertEquals("true", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
+      case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | 
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
         assertEquals("false", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case KafkaConfig.ZkSslProtocolProp =>
+      case ZkConfigs.ZK_SSL_PROTOCOL_PROP =>
         assertEquals("TLSv1.2", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       case _ => assertEquals(kafkaValue, 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
     })
@@ -910,29 +911,29 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
     val props = new java.util.Properties()
     val kafkaValue = "kafkaValue"
     val configs = Map("zookeeper.connect" -> "somewhere", // required, 
otherwise we would omit it
-      KafkaConfig.ZkSslClientEnableProp -> "true",
-      KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
-      KafkaConfig.ZkSslProtocolProp -> kafkaValue,
-      KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
-      KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
-      KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
-      KafkaConfig.ZkSslCrlEnableProp -> "false",
-      KafkaConfig.ZkSslOcspEnableProp -> "false")
+      ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
+      ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS",
+      ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false",
+      ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false")
     configs.foreach{case (key, value) => props.put(key, value.toString) }
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
     // confirm we get all the values we expect
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match 
{
-        case KafkaConfig.ZkSslClientEnableProp | 
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
+    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop 
=> prop match {
+        case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | 
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
           assertEquals("true", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-        case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp 
=>
+        case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | 
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
           assertEquals("false", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
         case _ => assertEquals(kafkaValue, 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       })
@@ -945,43 +946,43 @@ class AuthorizerTest extends QuorumTestHarness with 
BaseAuthorizerTest {
     val prefixedValue = "prefixedValue"
     val prefix = "authorizer."
     val configs = Map("zookeeper.connect" -> "somewhere", // required, 
otherwise we would omit it
-      KafkaConfig.ZkSslClientEnableProp -> "false",
-      KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
-      KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
-      KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
-      KafkaConfig.ZkSslProtocolProp -> kafkaValue,
-      KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
-      KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
-      KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
-      KafkaConfig.ZkSslCrlEnableProp -> "false",
-      KafkaConfig.ZkSslOcspEnableProp -> "false",
-      prefix + KafkaConfig.ZkSslClientEnableProp -> "true",
-      prefix + KafkaConfig.ZkClientCnxnSocketProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslKeyStoreLocationProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslKeyStorePasswordProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslKeyStoreTypeProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslTrustStoreLocationProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslTrustStorePasswordProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslTrustStoreTypeProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslProtocolProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslEnabledProtocolsProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslCipherSuitesProp -> prefixedValue,
-      prefix + KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "",
-      prefix + KafkaConfig.ZkSslCrlEnableProp -> "true",
-      prefix + KafkaConfig.ZkSslOcspEnableProp -> "true")
+      ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "false",
+      ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue,
+      ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS",
+      ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false",
+      ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false",
+      prefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
+      prefix + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_PROTOCOL_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> prefixedValue,
+      prefix + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "",
+      prefix + ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "true",
+      prefix + ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "true")
     configs.foreach{case (key, value) => props.put(key, value.toString) }
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
     // confirm we get all the values we expect
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match 
{
-      case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp 
| KafkaConfig.ZkSslOcspEnableProp =>
+    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop 
=> prop match {
+      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | 
ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
         assertEquals("true", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
-      case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
+      case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
         assertEquals("false", 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       case _ => assertEquals(prefixedValue, 
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
     })
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 4e1c8eed276..8e7fe06ef2e 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.config.{ConfigException, 
SslConfigs}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.config.Defaults
+import org.apache.kafka.server.config.{Defaults, ZkConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.KafkaScheduler
@@ -211,7 +211,7 @@ class DynamicBrokerConfigTest {
 
     val securityPropsWithoutListenerPrefix = 
Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, 
securityPropsWithoutListenerPrefix)
-    val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
+    val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_PROP -> "somehost:2181")
     verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, 
nonDynamicProps)
 
     // Test update of configs with invalid type
@@ -709,7 +709,7 @@ class DynamicBrokerConfigTest {
   @Test
   def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     props.put(KafkaConfig.MetadataLogSegmentMinBytesProp, "1024")
     val config = new KafkaConfig(props)
     
assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a5d4d961fe1..fd4377f658e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
-import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.config.{ServerTopicConfigSynonyms, ZkConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
 import org.junit.jupiter.api.function.Executable
@@ -155,7 +155,7 @@ class KafkaConfigTest {
     val hostName = "fake-host"
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     props.setProperty(KafkaConfig.ListenersProp, 
s"PLAINTEXT://$hostName:$port")
     val serverConfig = KafkaConfig.fromProps(props)
 
@@ -186,7 +186,7 @@ class KafkaConfigTest {
   def testDuplicateListeners(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     // listeners with duplicate port
     props.setProperty(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9091,SSL://localhost:9091")
@@ -212,7 +212,7 @@ class KafkaConfigTest {
   def testIPv4AndIPv6SamePortListeners(): Unit = {
     val props = new Properties()
     props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
     var caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
@@ -452,7 +452,7 @@ class KafkaConfigTest {
   def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): 
Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
     assertBadConfigContainingMessage(props,
       "Error creating broker listeners from 'CONTROLLER://localhost:9092': No 
security protocol defined for listener CONTROLLER")
@@ -465,7 +465,7 @@ class KafkaConfigTest {
   def testBadListenerProtocol(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091")
 
     assertFalse(isValidKafkaConfig(props))
@@ -475,7 +475,7 @@ class KafkaConfigTest {
   def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     props.setProperty(KafkaConfig.ListenersProp, 
"CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
     props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
"CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
@@ -499,7 +499,7 @@ class KafkaConfigTest {
   def testListenerAndAdvertisedListenerNames(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     props.setProperty(KafkaConfig.ListenersProp, 
"EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
     props.setProperty(KafkaConfig.AdvertisedListenersProp, 
"EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
@@ -530,7 +530,7 @@ class KafkaConfigTest {
   def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     props.setProperty(KafkaConfig.ListenersProp, 
"SSL://localhost:9091,REPLICATION://localhost:9092")
     props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
@@ -541,7 +541,7 @@ class KafkaConfigTest {
   def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): 
Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
     props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
@@ -552,7 +552,7 @@ class KafkaConfigTest {
   def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
     props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
@@ -564,7 +564,7 @@ class KafkaConfigTest {
   def testCaseInsensitiveListenerProtocol(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     props.setProperty(KafkaConfig.ListenersProp, 
"plaintext://localhost:9091,SsL://localhost:9092")
     val config = KafkaConfig.fromProps(props)
     assertEquals(Some("SSL://localhost:9092"), 
config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
@@ -579,7 +579,7 @@ class KafkaConfigTest {
   def testListenerDefaults(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
 
     // configuration with no listeners
     val conf = KafkaConfig.fromProps(props)
@@ -593,7 +593,7 @@ class KafkaConfigTest {
   def testVersionConfiguration(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.BrokerIdProp, "1")
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     val conf = KafkaConfig.fromProps(props)
     assertEquals(MetadataVersion.latestProduction, 
conf.interBrokerProtocolVersion)
 
@@ -766,7 +766,7 @@ class KafkaConfigTest {
   def testFromPropsInvalid(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, 
"127.0.0.1:2181")
+      validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, 
"127.0.0.1:2181")
       validRequiredProperties
     }
     // to ensure a basis is valid - bootstraps all needed validation
@@ -774,25 +774,25 @@ class KafkaConfigTest {
 
     KafkaConfig.configNames.foreach { name =>
       name match {
-        case KafkaConfig.ZkConnectProp => // ignore string
-        case KafkaConfig.ZkSessionTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.ZkConnectionTimeoutMsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
-        case KafkaConfig.ZkEnableSecureAclsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
-        case KafkaConfig.ZkMaxInFlightRequestsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
-        case KafkaConfig.ZkSslClientEnableProp => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
-        case KafkaConfig.ZkClientCnxnSocketProp =>  //ignore string
-        case KafkaConfig.ZkSslKeyStoreLocationProp =>  //ignore string
-        case KafkaConfig.ZkSslKeyStorePasswordProp =>  //ignore string
-        case KafkaConfig.ZkSslKeyStoreTypeProp =>  //ignore string
-        case KafkaConfig.ZkSslTrustStoreLocationProp =>  //ignore string
-        case KafkaConfig.ZkSslTrustStorePasswordProp =>  //ignore string
-        case KafkaConfig.ZkSslTrustStoreTypeProp =>  //ignore string
-        case KafkaConfig.ZkSslProtocolProp =>  //ignore string
-        case KafkaConfig.ZkSslEnabledProtocolsProp =>  //ignore string
-        case KafkaConfig.ZkSslCipherSuitesProp =>  //ignore string
-        case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore 
string
-        case KafkaConfig.ZkSslCrlEnableProp => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
-        case KafkaConfig.ZkSslOcspEnableProp => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case ZkConfigs.ZK_CONNECT_PROP => // ignore string
+        case ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
+        case ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
+        case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_PROTOCOL_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP =>  //ignore string
+        case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => 
//ignore string
+        case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
 
         case KafkaConfig.BrokerIdProp => assertPropertyInvalid(baseProperties, 
name, "not_a_number")
         case KafkaConfig.NumNetworkThreadsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
@@ -1051,7 +1051,7 @@ class KafkaConfigTest {
   def testDynamicLogConfigs(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, 
"127.0.0.1:2181")
+      validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, 
"127.0.0.1:2181")
       validRequiredProperties
     }
 
@@ -1141,9 +1141,9 @@ class KafkaConfigTest {
   @Test
   def testSpecificProperties(): Unit = {
     val defaults = new Properties()
-    defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    defaults.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
     // For ZkConnectionTimeoutMs
-    defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, "1234")
     defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1")
     defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
@@ -1187,7 +1187,7 @@ class KafkaConfigTest {
   @Test
   def testNonroutableAdvertisedListeners(): Unit = {
     val props = new Properties()
-    props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
     props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
     assertFalse(isValidKafkaConfig(props))
   }
@@ -1601,7 +1601,7 @@ class KafkaConfigTest {
   @Test
   def testSaslJwksEndpointRetryDefaults(): Unit = {
     val props = new Properties()
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     val config = KafkaConfig.fromProps(props)
     
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
     
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
@@ -1769,7 +1769,7 @@ class KafkaConfigTest {
       "If using `zookeeper.metadata.migration.enable` in KRaft mode, 
`zookeeper.connect` must also be set.",
       assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(props)).getMessage)
 
-    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
     KafkaConfig.fromProps(props)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index bf6312f8631..035f60dab1d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -25,8 +25,11 @@ import org.junit.jupiter.api.Test
 import java.util.Properties
 import java.net.{InetAddress, ServerSocket}
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 
+import scala.jdk.CollectionConverters._
+
 class KafkaServerTest extends QuorumTestHarness {
 
   @Test
@@ -64,7 +67,7 @@ class KafkaServerTest extends QuorumTestHarness {
   @Test
   def testCreatesProperZkConfigWhenSaslDisabled(): Unit = {
     val props = new Properties
-    props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we 
would leave it out
+    props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we 
would leave it out
     val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
     assertEquals("false", zkClientConfig.getProperty(JaasUtils.ZK_SASL_CLIENT))
   }
@@ -72,10 +75,10 @@ class KafkaServerTest extends QuorumTestHarness {
   @Test
   def testCreatesProperZkTlsConfigWhenDisabled(): Unit = {
     val props = new Properties
-    props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we 
would leave it out
-    props.put(KafkaConfig.ZkSslClientEnableProp, "false")
+    props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we 
would leave it out
+    props.put(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "false")
     val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+    ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { 
propName =>
       assertNull(zkClientConfig.getProperty(propName))
     }
   }
@@ -83,51 +86,51 @@ class KafkaServerTest extends QuorumTestHarness {
   @Test
   def testCreatesProperZkTlsConfigWithTrueValues(): Unit = {
     val props = new Properties
-    props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we 
would leave it out
+    props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we 
would leave it out
     // should get correct config for all properties if TLS is enabled
     val someValue = "some_value"
     def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
-      case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp 
| KafkaConfig.ZkSslOcspEnableProp => "true"
-      case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "HTTPS"
+      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | 
ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true"
+      case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "HTTPS"
       case _ => someValue
     }
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => 
props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
+    
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp 
=> props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
     val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
     // now check to make sure the values were set correctly
     def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
-      case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp 
| KafkaConfig.ZkSslOcspEnableProp => "true"
-      case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "true"
+      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | 
ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true"
+      case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "true"
       case _ => someValue
     }
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
-      assertEquals(zkClientValueToExpect(kafkaProp), 
zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+    
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp =>
+      assertEquals(zkClientValueToExpect(kafkaProp), 
zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp))))
   }
 
   @Test
   def testCreatesProperZkTlsConfigWithFalseAndListValues(): Unit = {
     val props = new Properties
-    props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we 
would leave it out
+    props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we 
would leave it out
     // should get correct config for all properties if TLS is enabled
     val someValue = "some_value"
     def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
-      case KafkaConfig.ZkSslClientEnableProp => "true"
-      case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => 
"false"
-      case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => ""
-      case KafkaConfig.ZkSslEnabledProtocolsProp | 
KafkaConfig.ZkSslCipherSuitesProp => "A,B"
+      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true"
+      case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | 
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false"
+      case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => ""
+      case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | 
ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B"
       case _ => someValue
     }
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => 
props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
+    
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp 
=> props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
     val zkClientConfig = 
KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
     // now check to make sure the values were set correctly
     def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
-      case KafkaConfig.ZkSslClientEnableProp => "true"
-      case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => 
"false"
-      case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "false"
-      case KafkaConfig.ZkSslEnabledProtocolsProp | 
KafkaConfig.ZkSslCipherSuitesProp => "A,B"
+      case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true"
+      case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | 
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false"
+      case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "false"
+      case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | 
ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B"
       case _ => someValue
     }
-    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
-      assertEquals(zkClientValueToExpect(kafkaProp), 
zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+    
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp =>
+      assertEquals(zkClientValueToExpect(kafkaProp), 
zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp))))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 844701cc371..0e04bad7c8e 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, 
IntegerSerializer, StringDeserializer, StringSerializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
@@ -150,8 +151,8 @@ class ServerShutdownTest extends KafkaServerTestHarness {
       shutdownKRaftController()
       verifyCleanShutdownAfterFailedStartup[CancellationException]
     } else {
-      
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, 
"50")
-      propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, 
"some.invalid.hostname.foo.bar.local:65535")
+      
propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, 
"50")
+      propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECT_PROP, 
"some.invalid.hostname.foo.bar.local:65535")
       verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException]
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index d72ad2d7bcd..fb80c4b890d 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -20,6 +20,7 @@ import java.util.Properties
 
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metrics.MetricsContext
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -54,7 +55,7 @@ class ServerTest {
 
     val props = new Properties()
     props.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-    props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:0")
+    props.put(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:0")
     val config = KafkaConfig.fromProps(props)
 
     val context = Server.createKafkaMetricsContext(config, clusterId)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 44243d39ce8..33a4b27fd8b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -74,7 +74,7 @@ import org.apache.kafka.metadata.properties.MetaProperties
 import org.apache.kafka.server.ControllerRequestCompletionHandler
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.Defaults
+import org.apache.kafka.server.config.{Defaults, ZkConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -359,8 +359,8 @@ object TestUtils extends Logging {
       // controllerQuorumVotersFuture instead.
       props.put(KafkaConfig.QuorumVotersProp, "1000@localhost:0")
     } else {
-      props.put(KafkaConfig.ZkConnectProp, zkConnect)
-      props.put(KafkaConfig.ZkConnectionTimeoutMsProp, "10000")
+      props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect)
+      props.put(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "10000")
     }
     props.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "1500")
     props.put(KafkaConfig.ControllerSocketTimeoutMsProp, "1500")
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index a1a45784ad7..aa97f6831c3 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.zookeeper.KeeperException.{Code, NoAuthException, 
NoNodeException, NodeExistsException}
 import org.apache.zookeeper.{CreateMode, ZooDefs}
@@ -106,7 +106,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
     // TLS connectivity itself is tested in system tests rather than here to 
avoid having to add TLS support
     // to kafka.zk.EmbeddedZookeeper
     val clientConfig = new ZKClientConfig()
-    val propKey = KafkaConfig.ZkClientCnxnSocketProp
+    val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP
     val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
     KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
     val client = KafkaZkClient(zkConnect, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
diff --git 
a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala 
b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
index 569cb5764b4..b4dccaecb0a 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -21,6 +21,7 @@ import kafka.zk.ZkMigrationClient
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.security.PasswordEncoder
+import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 import java.util.Properties
@@ -40,7 +41,7 @@ class ZkMigrationTestHarness extends QuorumTestHarness {
 
   val encoder: PasswordEncoder = {
     val encoderProps = new Properties()
-    encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get 
around the config validation
+    encoderProps.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:1234") // Get 
around the config validation
     encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk 
secret to encrypt the
     val encoderConfig = new KafkaConfig(encoderProps)
     PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get,
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index ebb6ccf4453..affe6c3839c 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -27,6 +27,7 @@ import kafka.utils.TestUtils
 import kafka.server.QuorumTestHarness
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
@@ -102,7 +103,7 @@ class ZooKeeperClientTest extends QuorumTestHarness {
     // TLS connectivity itself is tested in system tests rather than here to 
avoid having to add TLS support
     // to kafka.zk.EmbeddedZookeeper
     val clientConfig = new ZKClientConfig()
-    val propKey = KafkaConfig.ZkClientCnxnSocketProp
+    val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP
     val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
     KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
     val client = newZooKeeperClient(clientConfig = clientConfig)
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 75fcf3f24b6..f4d17ed0740 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZkConfigs;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -176,7 +177,7 @@ public class MetadataRequestBenchmark {
 
     private KafkaApis createKafkaApis() {
         Properties kafkaProps =  new Properties();
-        kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk");
+        kafkaProps.put(ZkConfigs.ZK_CONNECT_PROP, "zk");
         kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
         KafkaConfig config = new KafkaConfig(kafkaProps);
         return new KafkaApisBuilder().
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java 
b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index 0c7ca123b87..5b425c0dc49 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -44,16 +44,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class Defaults {
-    /** ********* Zookeeper Configuration *********/
-    public static final int ZK_SESSION_TIMEOUT_MS = 18000;
-    public static final boolean ZK_ENABLE_SECURE_ACLS = false;
-    public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
-    public static final boolean ZK_SSL_CLIENT_ENABLE = false;
-    public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
-    public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = 
"HTTPS";
-    public static final boolean ZK_SSL_CRL_ENABLE = false;
-    public static final boolean ZK_SSL_OCSP_ENABLE = false;
-
     /** ********* General Configuration *********/
     public static final boolean BROKER_ID_GENERATION_ENABLE = true;
     public static final int MAX_RESERVED_BROKER_ID = 1000;
diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
new file mode 100644
index 00000000000..2ddd2ef2145
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class ZkConfigs {
+    /** ********* Zookeeper Configuration ***********/
+    public static final String ZK_CONNECT_PROP = "zookeeper.connect";
+    public static final String ZK_SESSION_TIMEOUT_MS_PROP = 
"zookeeper.session.timeout.ms";
+    public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = 
"zookeeper.connection.timeout.ms";
+    public static final String ZK_ENABLE_SECURE_ACLS_PROP = 
"zookeeper.set.acl";
+    public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = 
"zookeeper.max.in.flight.requests";
+    public static final String ZK_SSL_CLIENT_ENABLE_PROP = 
"zookeeper.ssl.client.enable";
+    public static final String ZK_CLIENT_CNXN_SOCKET_PROP = 
"zookeeper.clientCnxnSocket";
+    public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = 
"zookeeper.ssl.keystore.location";
+    public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = 
"zookeeper.ssl.keystore.password";
+    public static final String ZK_SSL_KEY_STORE_TYPE_PROP = 
"zookeeper.ssl.keystore.type";
+    public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = 
"zookeeper.ssl.truststore.location";
+    public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = 
"zookeeper.ssl.truststore.password";
+    public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = 
"zookeeper.ssl.truststore.type";
+    public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol";
+    public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = 
"zookeeper.ssl.enabled.protocols";
+    public static final String ZK_SSL_CIPHER_SUITES_PROP = 
"zookeeper.ssl.cipher.suites";
+    public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = 
"zookeeper.ssl.endpoint.identification.algorithm";
+    public static final String ZK_SSL_CRL_ENABLE_PROP = 
"zookeeper.ssl.crl.enable";
+    public static final String ZK_SSL_OCSP_ENABLE_PROP = 
"zookeeper.ssl.ocsp.enable";
+
+    public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper 
connection string in the form <code>hostname:port</code> where host and port 
are the " +
+        "host and port of a ZooKeeper server. To allow connecting through 
other ZooKeeper nodes when that ZooKeeper machine is " +
+        "down you can also specify multiple hosts in the form 
<code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
+        "The server can also have a ZooKeeper chroot path as part of its 
ZooKeeper connection string which puts its data under some path in the global 
ZooKeeper namespace. " +
+        "For example to give a chroot path of <code>/chroot/path</code> you 
would give the connection string as 
<code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>.";
+    public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session 
timeout";
+    public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time 
that the client waits to establish a connection to ZooKeeper. If not set, the 
value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used";
+    public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use 
secure ACLs";
+    public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum 
number of unacknowledged requests the client will send to ZooKeeper before 
blocking.";
+    public static final String ZK_SSL_CLIENT_ENABLE_DOC;
+    public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
+    public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
+    public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC;
+    public static final String ZK_SSL_KEY_STORE_TYPE_DOC;
+    public static final String ZK_SSL_TRUST_STORE_LOCATION_DOC;
+    public static final String ZK_SSL_TRUST_STORE_PASSWORD_DOC;
+    public static final String ZK_SSL_TRUST_STORE_TYPE_DOC;
+    public static final String ZK_SSL_PROTOCOL_DOC;
+    public static final String ZK_SSL_ENABLED_PROTOCOLS_DOC;
+    public static final String ZK_SSL_CIPHER_SUITES_DOC;
+    public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC;
+    public static final String ZK_SSL_CRL_ENABLE_DOC;
+    public static final String ZK_SSL_OCSP_ENABLE_DOC;
+
+    // a map from the Kafka config to the corresponding ZooKeeper Java system 
property
+    public static final Map<String, String> 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
+
+    public static final int ZK_SESSION_TIMEOUT_MS = 18000;
+    public static final boolean ZK_ENABLE_SECURE_ACLS = false;
+    public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
+    public static final boolean ZK_SSL_CLIENT_ENABLE = false;
+    public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
+    public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = 
"HTTPS";
+    public static final boolean ZK_SSL_CRL_ENABLE = false;
+    public static final boolean ZK_SSL_OCSP_ENABLE = false;
+
+    // See ZKClientConfig.SECURE_CLIENT
+    private static final String SECURE_CLIENT = "zookeeper.client.secure";
+    // See ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET
+    private static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = 
"zookeeper.clientCnxnSocket";
+
+    static {
+        Map<String, String> zkSslConfigToSystemPropertyMap = new HashMap<>();
+
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_CLIENT_ENABLE_PROP, 
SECURE_CLIENT);
+        zkSslConfigToSystemPropertyMap.put(ZK_CLIENT_CNXN_SOCKET_PROP, 
ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_LOCATION_PROP, 
"zookeeper.ssl.keyStore.location");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_PASSWORD_PROP, 
"zookeeper.ssl.keyStore.password");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_TYPE_PROP, 
"zookeeper.ssl.keyStore.type");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_LOCATION_PROP, 
"zookeeper.ssl.trustStore.location");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_PASSWORD_PROP, 
"zookeeper.ssl.trustStore.password");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_TYPE_PROP, 
"zookeeper.ssl.trustStore.type");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_PROTOCOL_PROP, 
"zookeeper.ssl.protocol");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENABLED_PROTOCOLS_PROP, 
"zookeeper.ssl.enabledProtocols");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_CIPHER_SUITES_PROP, 
"zookeeper.ssl.ciphersuites");
+        
zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP,
 "zookeeper.ssl.hostnameVerification");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_CRL_ENABLE_PROP, 
"zookeeper.ssl.crl");
+        zkSslConfigToSystemPropertyMap.put(ZK_SSL_OCSP_ENABLE_PROP, 
"zookeeper.ssl.ocsp");
+
+        ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = 
Collections.unmodifiableMap(zkSslConfigToSystemPropertyMap);
+
+        ZK_SSL_CLIENT_ENABLE_DOC = "Set client to use TLS when connecting to 
ZooKeeper." +
+            " An explicit value overrides any value set via the 
<code>zookeeper.client.secure</code> system property (note the different 
name)." +
+            " Defaults to false if neither is set; when true, <code>" + 
ZK_CLIENT_CNXN_SOCKET_PROP + "</code> must be set (typically to 
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set 
may include " +
+            ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.keySet().stream().filter(x -> 
!x.equals(ZK_SSL_CLIENT_ENABLE_PROP) && 
!x.equals(ZK_CLIENT_CNXN_SOCKET_PROP)).sorted().collect(Collectors.joining("<code>",
 "</code>, <code>", "</code>"));
+        ZK_CLIENT_CNXN_SOCKET_DOC = "Typically set to 
<code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS 
connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the same-named <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_CLIENT_CNXN_SOCKET_PROP) + "</code> 
system property.";
+        ZK_SSL_KEY_STORE_LOCATION_DOC = "Keystore location when using a 
client-side certificate with TLS connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_LOCATION_PROP) + 
"</code> system property (note the camelCase).";
+        ZK_SSL_KEY_STORE_PASSWORD_DOC = "Keystore password when using a 
client-side certificate with TLS connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_PASSWORD_PROP) + 
"</code> system property (note the camelCase)." +
+            " Note that ZooKeeper does not support a key password different 
from the keystore password, so be sure to set the key password in the keystore 
to be identical to the keystore password; otherwise the connection attempt to 
Zookeeper will fail.";
+        ZK_SSL_KEY_STORE_TYPE_DOC = "Keystore type when using a client-side 
certificate with TLS connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_TYPE_PROP) + "</code> 
system property (note the camelCase)." +
+            " The default value of <code>null</code> means the type will be 
auto-detected based on the filename extension of the keystore.";
+        ZK_SSL_TRUST_STORE_LOCATION_DOC = "Truststore location when using TLS 
connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_LOCATION_PROP) + 
"</code> system property (note the camelCase).";
+        ZK_SSL_TRUST_STORE_PASSWORD_DOC = "Truststore password when using TLS 
connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_PASSWORD_PROP) + 
"</code> system property (note the camelCase).";
+        ZK_SSL_TRUST_STORE_TYPE_DOC = "Truststore type when using TLS 
connectivity to ZooKeeper." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_TYPE_PROP) + 
"</code> system property (note the camelCase)." +
+            " The default value of <code>null</code> means the type will be 
auto-detected based on the filename extension of the truststore.";
+        ZK_SSL_PROTOCOL_DOC = "Specifies the protocol to be used in ZooKeeper 
TLS negotiation." +
+            " An explicit value overrides any value set via the same-named 
<code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_PROTOCOL_PROP) + 
"</code> system property.";
+        ZK_SSL_ENABLED_PROTOCOLS_DOC = "Specifies the enabled protocol(s) in 
ZooKeeper TLS negotiation (csv)." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENABLED_PROTOCOLS_PROP) + 
"</code> system property (note the camelCase)." +
+            " The default value of <code>null</code> means the enabled 
protocol will be the value of the <code>" + ZK_SSL_PROTOCOL_PROP + "</code> 
configuration property.";
+        ZK_SSL_CIPHER_SUITES_DOC = "Specifies the enabled cipher suites to be 
used in ZooKeeper TLS negotiation (csv)." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CIPHER_SUITES_PROP) + "</code> 
system property (note the single word \"ciphersuites\")." +
+            " The default value of <code>null</code> means the list of enabled 
cipher suites is determined by the Java runtime being used.";
+        ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "Specifies whether to 
enable hostname verification in the ZooKeeper TLS negotiation process, with 
(case-insensitively) \"https\" meaning ZooKeeper hostname verification is 
enabled and an explicit blank value meaning it is disabled (disabling it is 
only recommended for testing purposes)." +
+            " An explicit value overrides any \"true\" or \"false\" value set 
via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP)
 + "</code> system property (note the different name and values; true implies 
https and false implies blank).";
+        ZK_SSL_CRL_ENABLE_DOC = "Specifies whether to enable Certificate 
Revocation List in the ZooKeeper TLS protocols." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CRL_ENABLE_PROP) + "</code> 
system property (note the shorter name).";
+        ZK_SSL_OCSP_ENABLE_DOC = "Specifies whether to enable Online 
Certificate Status Protocol in the ZooKeeper TLS protocols." +
+            " Overrides any explicit value set via the <code>" + 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_OCSP_ENABLE_PROP) + "</code> 
system property (note the shorter name).";
+    }
+
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 4232e1d74c9..89af1d5729e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.server.config.ConfigType;
+import org.apache.kafka.server.config.ZkConfigs;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.test.TestCondition;
@@ -109,7 +110,7 @@ public class EmbeddedKafkaCluster {
         zookeeper = new EmbeddedZookeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
-        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString());
         putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), 
"PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
         putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, 
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 7f945ffe06d..4b57d576ae1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.config.ZkConfigs;
 import org.apache.kafka.server.util.MockTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class KafkaEmbedded {
         effectiveConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
         effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
         effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true);
-        effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000);
+        effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, 10000);
 
         effectiveConfig.putAll(initialConfig);
         effectiveConfig.setProperty(KafkaConfig.LogDirProp(), 
logDir.getAbsolutePath());


Reply via email to