This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c92f16a727e KAFKA-17787: Removed --zookeeper option and logic from 
ConfigCommand (#17507)
c92f16a727e is described below

commit c92f16a727ebb44b0c44038fbfa254d425dffcae
Author: Chengyan <[email protected]>
AuthorDate: Tue Nov 12 01:50:49 2024 +0800

    KAFKA-17787: Removed --zookeeper option and logic from ConfigCommand 
(#17507)
    
    
    Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai 
<[email protected]>
    Co-authored-by: Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 352 +--------
 .../test/java/kafka/admin/ConfigCommandTest.java   | 862 +--------------------
 2 files changed, 40 insertions(+), 1174 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 97c727826a9..993a2abe5c4 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -19,27 +19,21 @@ package kafka.admin
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
-import java.util.{Collections, Optional, Properties}
+import java.util.{Collections, Properties}
 import joptsimple._
-import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
+import kafka.server.DynamicConfig
 import kafka.utils.Implicits._
 import kafka.utils.Logging
-import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, 
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, 
ScramMechanism => PublicScramMechanism}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, 
ScramFormatter, ScramMechanism}
-import org.apache.kafka.common.utils.{Exit, Sanitizer, Time, Utils}
-import org.apache.kafka.server.config.{ConfigType, QuotaConfig, ZkConfigs, 
ZooKeeperInternals}
-import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.utils.{Exit, Utils}
+import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.storage.internals.log.LogConfig
-import org.apache.zookeeper.client.ZKClientConfig
-
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection._
@@ -63,24 +57,12 @@ import scala.collection._
  * when describing or altering default configuration for users, clients, 
brokers, or ips, respectively.
  * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or 
--ip-defaults may be specified in place of
  * --entity-type <users|clients|brokers|ips> --entity-default, respectively.
- *
- * For most use cases, this script communicates with a kafka cluster 
(specified via the
- * `--bootstrap-server` option). There are three exceptions where direct 
communication with a
- * ZooKeeper ensemble (specified via the `--zookeeper` option) is allowed:
- *
- * 1. Describe/alter user configs where the config is a SCRAM mechanism name 
(i.e. a SCRAM credential for a user)
- * 2. Describe/alter broker configs for a particular broker when that broker 
is down
- * 3. Describe/alter broker default configs when all brokers are down
- *
- * For example, this allows password configs to be stored encrypted in ZK 
before brokers are started,
- * avoiding cleartext passwords in `server.properties`.
  */
 object ConfigCommand extends Logging {
 
-  val BrokerDefaultEntityName = ""
+  private val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
   private val BrokerSupportedConfigTypes = ConfigType.ALL.asScala :+ 
BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS :+ ConfigType.GROUP
-  private val ZkSupportedConfigTypes = Seq(ConfigType.USER, ConfigType.BROKER)
   private val DefaultScramIterations = 4096
 
   def main(args: Array[String]): Unit = {
@@ -91,14 +73,7 @@ object ConfigCommand extends Logging {
         "This tool helps to manipulate and describe entity config for a topic, 
client, user, broker, ip, client-metrics or group")
 
       opts.checkArgs()
-
-      if (opts.options.has(opts.zkConnectOpt)) {
-        System.out.println(s"Warning: --zookeeper is deprecated and will be 
removed in a future version of Kafka.")
-        System.out.println(s"Use --bootstrap-server instead to specify a 
broker to connect to.")
-        processCommandWithZk(opts.options.valueOf(opts.zkConnectOpt), opts)
-      } else {
-        processCommand(opts)
-      }
+      processCommand(opts)
     } catch {
       case e @ (_: IllegalArgumentException | _: InvalidConfigurationException 
| _: OptionException) =>
         logger.debug(s"Failed config command with args '${args.mkString(" 
")}'", e)
@@ -113,176 +88,6 @@ object ConfigCommand extends Logging {
     }
   }
 
-  private def processCommandWithZk(zkConnectString: String, opts: 
ConfigCommandOptions): Unit = {
-    val zkClientConfig = 
ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, 
opts.zkTlsConfigFile)
-      .getOrElse(new ZKClientConfig())
-    val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || 
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
-      Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = 
"ConfigCommand", enableEntityConfigControllerCheck = false)
-    val adminZkClient = new AdminZkClient(zkClient)
-    try {
-      if (opts.options.has(opts.alterOpt))
-        alterConfigWithZk(zkClient, opts, adminZkClient)
-      else if (opts.options.has(opts.describeOpt))
-        describeConfigWithZk(zkClient, opts, adminZkClient)
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def alterConfigWithZk(zkClient: KafkaZkClient, opts: ConfigCommandOptions, 
adminZkClient: AdminZkClient): Unit = {
-    val configsToBeAdded = parseConfigsToBeAdded(opts)
-    val configsToBeDeleted = parseConfigsToBeDeleted(opts)
-    val entity = parseEntity(opts)
-    val entityType = entity.root.entityType
-    val entityName = entity.fullSanitizedName
-    val errorMessage = s"--bootstrap-server option must be specified to update 
$entityType configs: {add: $configsToBeAdded, delete: $configsToBeDeleted}"
-    var isUserClientId = false
-
-    if (entityType == ConfigType.USER) {
-      isUserClientId = entity.child.exists(e => 
ConfigType.CLIENT.equals(e.entityType))
-      if (!configsToBeAdded.isEmpty || configsToBeDeleted.nonEmpty) {
-        val info = "User configuration updates using ZooKeeper are only 
supported for SCRAM credential updates."
-        val scramMechanismNames = ScramMechanism.values.map(_.mechanismName)
-        // make sure every added/deleted configs are SCRAM related, other 
configs are not supported using zookeeper
-        
require(configsToBeAdded.stringPropertyNames.asScala.forall(scramMechanismNames.contains),
-          s"$errorMessage. $info")
-        require(configsToBeDeleted.forall(scramMechanismNames.contains), 
s"$errorMessage. $info")
-      }
-      preProcessScramCredentials(configsToBeAdded)
-    } else if (entityType == ConfigType.BROKER) {
-      // Dynamic broker configs can be updated using ZooKeeper only if the 
corresponding broker is not running.
-      if (!configsToBeAdded.isEmpty || configsToBeDeleted.nonEmpty) {
-        validateBrokersNotRunning(entityName, adminZkClient, zkClient, 
errorMessage)
-
-        val perBrokerConfig = entityName != ZooKeeperInternals.DEFAULT_STRING
-        preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig)
-      }
-    }
-
-    // compile the final set of configs
-    val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
-
-    // fail the command if any of the configs to be deleted does not exist
-    val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
-    if (invalidConfigs.nonEmpty)
-      throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-    configs ++= configsToBeAdded
-    configsToBeDeleted.foreach(configs.remove(_))
-
-    adminZkClient.changeConfigs(entityType, entityName, configs, 
isUserClientId)
-
-    System.out.println(s"Completed updating config for entity: $entity.")
-  }
-
-  private def validateBrokersNotRunning(entityName: String,
-                                        adminZkClient: AdminZkClient,
-                                        zkClient: KafkaZkClient,
-                                        errorMessage: String): Unit = {
-    val perBrokerConfig = entityName != ZooKeeperInternals.DEFAULT_STRING
-    val info = "Broker configuration operations using ZooKeeper are only 
supported if the affected broker(s) are not running."
-    if (perBrokerConfig) {
-      adminZkClient.parseBroker(entityName).foreach { brokerId =>
-        require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage - broker 
$brokerId is running. $info")
-      }
-    } else {
-      val runningBrokersCount = zkClient.getAllBrokersInCluster.size
-      require(runningBrokersCount == 0, s"$errorMessage - $runningBrokersCount 
brokers are running. $info")
-    }
-  }
-
-  private def preProcessScramCredentials(configsToBeAdded: Properties): Unit = 
{
-    def scramCredential(mechanism: ScramMechanism, credentialStr: String): 
String = {
-      val pattern = "(?:iterations=([0-9]*),)?password=(.*)".r
-      val (iterations, password) = credentialStr match {
-          case pattern(iterations, password) => (if (iterations != null) 
iterations.toInt else DefaultScramIterations, password)
-          case _ => throw new IllegalArgumentException(s"Invalid credential 
property $mechanism=$credentialStr")
-        }
-      if (iterations < mechanism.minIterations())
-        throw new IllegalArgumentException(s"Iterations $iterations is less 
than the minimum ${mechanism.minIterations()} required for $mechanism")
-      val credential = new 
ScramFormatter(mechanism).generateCredential(password, iterations)
-      ScramCredentialUtils.credentialToString(credential)
-    }
-    for (mechanism <- ScramMechanism.values) {
-      configsToBeAdded.getProperty(mechanism.mechanismName) match {
-        case null =>
-        case value =>
-          configsToBeAdded.setProperty(mechanism.mechanismName, 
scramCredential(mechanism, value))
-      }
-    }
-  }
-
-  def createPasswordEncoder(encoderConfigs: java.util.Map[String, String]): 
PasswordEncoder = {
-    val encoderSecret = 
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
-      .orElseThrow(() => new IllegalArgumentException("Password encoder secret 
not specified"))
-    PasswordEncoder.encrypting(new Password(encoderSecret),
-      null,
-      
encoderConfigs.getOrDefault(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
-      
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG))
-        .map[Int](Integer.parseInt)
-        .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
-      
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG))
-        .map[Int](Integer.parseInt)
-        .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT)
-    )
-  }
-
-  /**
-   * Pre-process broker configs provided to convert them to persistent format.
-   * Password configs are encrypted using the secret 
`PasswordEncoderConfigs.SECRET`.
-   * The secret is removed from `configsToBeAdded` and will not be persisted 
in ZooKeeper.
-   */
-  private def preProcessBrokerConfigs(configsToBeAdded: Properties, 
perBrokerConfig: Boolean): Unit = {
-    val passwordEncoderConfigs = new Properties
-    passwordEncoderConfigs ++= configsToBeAdded.asScala.filter { case (key, _) 
=> key.startsWith("password.encoder.") }
-    if (!passwordEncoderConfigs.isEmpty) {
-      info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be 
used for encrypting" +
-        " passwords, but will not be stored in ZooKeeper.")
-      passwordEncoderConfigs.asScala.keySet.foreach(configsToBeAdded.remove)
-    }
-
-    DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
-    val passwordConfigs = 
configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
-    if (passwordConfigs.nonEmpty) {
-      
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
-        s"${PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG} must be 
specified to update $passwordConfigs." +
-          " Other password encoder configs like cipher algorithm and 
iterations may also be specified" +
-          " to override the default encoding parameters. Password encoder 
configs will not be persisted" +
-          " in ZooKeeper."
-      )
-      val passwordConfigsMap = new java.util.HashMap[String, String]
-      passwordEncoderConfigs.forEach { (key, value) =>
-        passwordConfigsMap.put(key.toString, value.toString)
-      }
-      val passwordEncoder = createPasswordEncoder(passwordConfigsMap)
-      passwordConfigs.foreach { configName =>
-        val encodedValue = passwordEncoder.encode(new 
Password(configsToBeAdded.getProperty(configName)))
-        configsToBeAdded.setProperty(configName, encodedValue)
-      }
-    }
-  }
-
-  def describeConfigWithZk(zkClient: KafkaZkClient, opts: 
ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
-    val configEntity = parseEntity(opts)
-    val entityType = configEntity.root.entityType
-    val describeAllUsers = entityType == ConfigType.USER && 
configEntity.root.sanitizedName.isEmpty && configEntity.child.isEmpty
-    val entityName = configEntity.fullSanitizedName
-    val errorMessage = s"--bootstrap-server option must be specified to 
describe $entityType"
-    if (entityType == ConfigType.BROKER) {
-      // Dynamic broker configs can be described using ZooKeeper only if the 
corresponding broker is not running.
-      validateBrokersNotRunning(entityName, adminZkClient, zkClient, 
errorMessage)
-    }
-
-    val entities = configEntity.getAllEntities(zkClient)
-    for (entity <- entities) {
-      val configs = adminZkClient.fetchEntityConfig(entity.root.entityType, 
entity.fullSanitizedName)
-      // When describing all users, don't include empty user nodes with only 
<user, client> quota overrides.
-      if (!configs.isEmpty || !describeAllUsers) {
-        System.out.println("Configs for %s are %s"
-          .format(entity, configs.asScala.map(kv => kv._1 + "=" + 
kv._2).mkString(",")))
-      }
-    }
-  }
 
   @nowarn("cat=deprecation")
   def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
@@ -693,119 +498,8 @@ object ConfigCommand extends Logging {
     
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30,
 TimeUnit.SECONDS).asScala
   }
 
-  case class Entity(entityType: String, sanitizedName: Option[String]) {
-    val entityPath: String = sanitizedName match {
-      case Some(n) => entityType + "/" + n
-      case None => entityType
-    }
-    override def toString: String = {
-      val typeName = entityType match {
-        case ConfigType.USER => "user-principal"
-        case ConfigType.CLIENT => "client-id"
-        case ConfigType.TOPIC => "topic"
-        case ConfigType.GROUP => "group"
-        case t => t
-      }
-      sanitizedName match {
-        case Some(ZooKeeperInternals.DEFAULT_STRING) => "default " + typeName
-        case Some(n) =>
-          val desanitized = if (entityType == ConfigType.USER || entityType == 
ConfigType.CLIENT) Sanitizer.desanitize(n) else n
-          s"$typeName '$desanitized'"
-        case None => entityType
-      }
-    }
-  }
-
-  case class ConfigEntity(root: Entity, child: Option[Entity]) {
-    val fullSanitizedName: String = root.sanitizedName.getOrElse("") + 
child.map(s => "/" + s.entityPath).getOrElse("")
-
-    def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = {
-      // Describe option examples:
-      //   Describe entity with specified name:
-      //     --entity-type topics --entity-name topic1 (topic1)
-      //   Describe all entities of a type (topics/brokers/users/clients):
-      //     --entity-type topics (all topics)
-      //   Describe <user, client> quotas:
-      //     --entity-type users --entity-name user1 --entity-type clients 
--entity-name client2 (<user1, client2>)
-      //     --entity-type users --entity-name userA --entity-type clients 
(all clients of userA)
-      //     --entity-type users --entity-type clients (all <user, client>s))
-      //   Describe default quotas:
-      //     --entity-type users --entity-default (Default user)
-      //     --entity-type users --entity-default --entity-type clients 
--entity-default (Default <user, client>)
-      (root.sanitizedName, child) match {
-        case (None, _) =>
-          val rootEntities = zkClient.getAllEntitiesWithConfig(root.entityType)
-                                   .map(name => 
ConfigEntity(Entity(root.entityType, Some(name)), child))
-          child match {
-            case Some(s) =>
-                rootEntities.flatMap(rootEntity =>
-                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, 
None))).getAllEntities(zkClient))
-            case None => rootEntities
-          }
-        case (_, Some(childEntity)) =>
-          childEntity.sanitizedName match {
-            case Some(_) => Seq(this)
-            case None =>
-                zkClient.getAllEntitiesWithConfig(root.entityPath + "/" + 
childEntity.entityType)
-                       .map(name => ConfigEntity(root, 
Some(Entity(childEntity.entityType, Some(name)))))
-
-          }
-        case (_, None) =>
-          Seq(this)
-      }
-    }
-
-    override def toString: String = {
-      root.toString + child.map(s => ", " + s.toString).getOrElse("")
-    }
-  }
-
-  def parseEntity(opts: ConfigCommandOptions): ConfigEntity = {
-    val entityTypes = opts.entityTypes
-    val entityNames = opts.entityNames
-    if (entityTypes.head == ConfigType.USER || entityTypes.head == 
ConfigType.CLIENT)
-      parseClientQuotaEntity(opts, entityTypes, entityNames)
-    else {
-      // Exactly one entity type and at-most one entity name expected for 
other entities
-      val name = entityNames.headOption match {
-        case Some("") => Some(ZooKeeperInternals.DEFAULT_STRING)
-        case v => v
-      }
-      ConfigEntity(Entity(entityTypes.head, name), None)
-    }
-  }
-
-  private def parseClientQuotaEntity(opts: ConfigCommandOptions, types: 
List[String], names: List[String]): ConfigEntity = {
-    if (opts.options.has(opts.alterOpt) && names.size != types.size)
-      throw new IllegalArgumentException("--entity-name or --entity-default 
must be specified with each --entity-type for --alter")
-
-    val reverse = types.size == 2 && types.head == ConfigType.CLIENT
-    val entityTypes = if (reverse) types.reverse else types
-    val sortedNames = (if (reverse && names.length == 2) names.reverse else 
names).iterator
-
-    def sanitizeName(entityType: String, name: String) = {
-      if (name.isEmpty)
-        ZooKeeperInternals.DEFAULT_STRING
-      else {
-        entityType match {
-          case ConfigType.USER | ConfigType.CLIENT => Sanitizer.sanitize(name)
-          case _ => throw new IllegalArgumentException("Invalid entity type " 
+ entityType)
-        }
-      }
-    }
-
-    val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) 
Some(sanitizeName(t, sortedNames.next())) else None))
-    ConfigEntity(entities.head, entities.lift(1))
-  }
 
   class ConfigCommandOptions(args: Array[String]) extends 
CommandDefaultOptions(args) {
-
-    val zkConnectOpt: OptionSpec[String] = parser.accepts("zookeeper", 
"DEPRECATED. The connection string for the zookeeper connection in the form 
host:port. " +
-      "Multiple URLS can be given to allow fail-over. Required when 
configuring SCRAM credentials for users or " +
-      "dynamic broker configs when the relevant broker(s) are down. Not 
allowed otherwise.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
     val bootstrapServerOpt: OptionSpec[String] = 
parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
       .withRequiredArg
       .describedAs("server to connect to")
@@ -831,7 +525,7 @@ object ConfigCommand extends Logging {
       .ofType(classOf[String])
     private val entityDefault: OptionSpecBuilder = 
parser.accepts("entity-default", "Default entity name for 
clients/users/brokers/ips (applies to corresponding entity type)")
 
-    val nl: String = System.lineSeparator()
+    private val nl: String = System.lineSeparator()
     val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key 
Value pairs of configs to add. Square brackets can be used to group values 
which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of 
valid configurations: " +
       "For entity-type '" + ConfigType.TOPIC + "': " + 
LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
       "For entity-type '" + ConfigType.BROKER + "': " + 
DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, 
nl) +
@@ -879,10 +573,6 @@ object ConfigCommand extends Logging {
     val clientMetrics: OptionSpec[String] = parser.accepts("client-metrics", 
"The client metrics config resource name.")
       .withRequiredArg
       .ofType(classOf[String])
-    val zkTlsConfigFile: OptionSpec[String] = 
parser.accepts("zk-tls-config-file",
-      "Identifies the file where ZooKeeper client TLS connectivity properties 
are defined.  Any properties other than " +
-        
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(",
 ") + " are ignored.")
-      .withRequiredArg().describedAs("ZooKeeper TLS 
configuration").ofType(classOf[String])
     options = parser.parse(args : _*)
 
     private val entityFlags = List((topic, ConfigType.TOPIC),
@@ -930,10 +620,12 @@ object ConfigCommand extends Logging {
       if (entityTypeVals.size != entityTypeVals.distinct.size)
         throw new IllegalArgumentException(s"Duplicate entity type(s) 
specified: ${entityTypeVals.diff(entityTypeVals.distinct).mkString(",")}")
 
-      val (allowedEntityTypes, connectOptString) = if 
(options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt))
-        (BrokerSupportedConfigTypes, "--bootstrap-server or 
--bootstrap-controller")
-      else
-        (ZkSupportedConfigTypes, "--zookeeper")
+      val (allowedEntityTypes, connectOptString) =
+        if (options.has(bootstrapServerOpt) || 
options.has(bootstrapControllerOpt)) {
+          (BrokerSupportedConfigTypes, "--bootstrap-server or 
--bootstrap-controller")
+        } else {
+          throw new IllegalArgumentException("Either --bootstrap-server or 
--bootstrap-controller must be specified.")
+        }
 
       entityTypeVals.foreach(entityTypeVal =>
         if (!allowedEntityTypes.contains(entityTypeVal))
@@ -952,19 +644,9 @@ object ConfigCommand extends Logging {
       val hasEntityDefault = entityNames.exists(_.isEmpty)
 
       val numConnectOptions = (if (options.has(bootstrapServerOpt)) 1 else 0) +
-        (if (options.has(bootstrapControllerOpt)) 1 else 0) +
-        (if (options.has(zkConnectOpt)) 1 else 0)
-      if (numConnectOptions == 0)
-        throw new IllegalArgumentException("One of the required 
--bootstrap-server, --boostrap-controller, or --zookeeper arguments must be 
specified")
-      else if (numConnectOptions > 1)
-        throw new IllegalArgumentException("Only one of --bootstrap-server, 
--boostrap-controller, and --zookeeper can be specified")
-
-      if (options.has(allOpt) && options.has(zkConnectOpt)) {
-        throw new IllegalArgumentException(s"--bootstrap-server must be 
specified for --all")
-      }
-      if (options.has(zkTlsConfigFile) && !options.has(zkConnectOpt)) {
-        throw new IllegalArgumentException("Only the --zookeeper option can be 
used with the --zk-tls-config-file option.")
-      }
+        (if (options.has(bootstrapControllerOpt)) 1 else 0)
+      if (numConnectOptions > 1)
+        throw new IllegalArgumentException("Only one of --bootstrap-server or 
--bootstrap-controller can be specified")
       if (hasEntityName && (entityTypeVals.contains(ConfigType.BROKER) || 
entityTypeVals.contains(BrokerLoggerConfigType))) {
         Seq(entityName, broker, 
brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId 
=>
           try brokerId.toInt catch {
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 6d3cba6d246..74fe3271f23 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -16,10 +16,6 @@
  */
 package kafka.admin;
 
-import kafka.cluster.Broker;
-import kafka.zk.AdminZkClient;
-import kafka.zk.KafkaZkClient;
-
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AlterClientQuotasOptions;
 import org.apache.kafka.clients.admin.AlterClientQuotasResult;
@@ -44,12 +40,8 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
-import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
 import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Sanitizer;
 import org.apache.kafka.server.config.ConfigType;
-import org.apache.kafka.server.config.ZooKeeperInternals;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -59,7 +51,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.File;
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -75,7 +66,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -87,17 +77,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConfigCommandTest {
-    private static final String ZK_CONNECT = "localhost:2181";
-    private static final DummyAdminZkClient DUMMY_ADMIN_ZK_CLIENT = new 
DummyAdminZkClient(null);
-
-    private static final List<String> ZOOKEEPER_BOOTSTRAP = 
Arrays.asList("--zookeeper", ZK_CONNECT);
     private static final List<String> BROKER_BOOTSTRAP = 
Arrays.asList("--bootstrap-server", "localhost:9092");
     private static final List<String> CONTROLLER_BOOTSTRAP = 
Arrays.asList("--bootstrap-controller", "localhost:9093");
 
@@ -106,34 +91,6 @@ public class ConfigCommandTest {
         assertNonZeroStatusExit("--blah");
     }
 
-    @Test
-    public void shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity() {
-        assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
-            "--entity-type", "topics",
-            "--describe")));
-    }
-
-    @Test
-    public void shouldExitWithNonZeroStatusOnZkCommandWithClientsEntity() {
-        assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
-            "--entity-type", "clients",
-            "--describe")));
-    }
-
-    @Test
-    public void shouldExitWithNonZeroStatusOnZkCommandWithIpsEntity() {
-        assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
-            "--entity-type", "ips",
-            "--describe")));
-    }
-
-    @Test
-    public void shouldExitWithNonZeroStatusOnZkCommandWithGroupsEntity() {
-        assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
-            "--entity-type", "groups",
-            "--describe")));
-    }
-
     @Test
     public void shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName() {
         assertNonZeroStatusExit(toArray(BROKER_BOOTSTRAP, Arrays.asList(
@@ -155,15 +112,6 @@ public class ConfigCommandTest {
             "--describe", "--broker-defaults")));
     }
 
-    @Test
-    public void 
shouldExitWithNonZeroStatusOnBrokerCommandWithZkTlsConfigFile() {
-        assertNonZeroStatusExit(
-            "--bootstrap-server", "invalid host",
-            "--entity-type", "users",
-            "--zk-tls-config-file", "zk_tls_config.properties",
-            "--describe");
-    }
-
     public static void assertNonZeroStatusExit(String... args) {
         AtomicReference<Integer> exitStatus = new AtomicReference<>();
         Exit.setExitProcedure((status, __) -> {
@@ -183,11 +131,6 @@ public class ConfigCommandTest {
         assertEquals(1, exitStatus.get());
     }
 
-    @Test
-    public void shouldFailParseArgumentsForClientsEntityTypeUsingZookeeper() {
-        assertThrows(IllegalArgumentException.class, () -> 
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "clients"));
-    }
-
     @Test
     public void shouldParseArgumentsForClientsEntityTypeWithBrokerBootstrap() {
         testArgumentParse(BROKER_BOOTSTRAP, "clients");
@@ -198,11 +141,6 @@ public class ConfigCommandTest {
         testArgumentParse(CONTROLLER_BOOTSTRAP, "clients");
     }
 
-    @Test
-    public void shouldParseArgumentsForUsersEntityTypeUsingZookeeper() {
-        testArgumentParse(ZOOKEEPER_BOOTSTRAP, "users");
-    }
-
     @Test
     public void shouldParseArgumentsForUsersEntityTypeWithBrokerBootstrap() {
         testArgumentParse(BROKER_BOOTSTRAP, "users");
@@ -213,11 +151,6 @@ public class ConfigCommandTest {
         testArgumentParse(CONTROLLER_BOOTSTRAP, "users");
     }
 
-    @Test
-    public void shouldFailParseArgumentsForTopicsEntityTypeUsingZookeeper() {
-        assertThrows(IllegalArgumentException.class, () -> 
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "topics"));
-    }
-
     @Test
     public void shouldParseArgumentsForTopicsEntityTypeWithBrokerBootstrap() {
         testArgumentParse(BROKER_BOOTSTRAP, "topics");
@@ -228,11 +161,6 @@ public class ConfigCommandTest {
         testArgumentParse(CONTROLLER_BOOTSTRAP, "topics");
     }
 
-    @Test
-    public void shouldParseArgumentsForBrokersEntityTypeUsingZookeeper() {
-        testArgumentParse(ZOOKEEPER_BOOTSTRAP, "brokers");
-    }
-
     @Test
     public void shouldParseArgumentsForBrokersEntityTypeWithBrokerBootstrap() {
         testArgumentParse(BROKER_BOOTSTRAP, "brokers");
@@ -253,11 +181,6 @@ public class ConfigCommandTest {
         testArgumentParse(CONTROLLER_BOOTSTRAP, "broker-loggers");
     }
 
-    @Test
-    public void shouldFailParseArgumentsForIpEntityTypeUsingZookeeper() {
-        assertThrows(IllegalArgumentException.class, () -> 
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "ips"));
-    }
-
     @Test
     public void shouldParseArgumentsForIpEntityTypeWithBrokerBootstrap() {
         testArgumentParse(BROKER_BOOTSTRAP, "ips");
@@ -268,11 +191,6 @@ public class ConfigCommandTest {
         testArgumentParse(CONTROLLER_BOOTSTRAP, "ips");
     }
 
-    @Test
-    public void shouldFailParseArgumentsForGroupEntityTypeUsingZookeeper() {
-        assertThrows(IllegalArgumentException.class, () -> 
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "groups"));
-    }
-
     @Test
     public void shouldParseArgumentsForGroupEntityTypeWithBrokerBootstrap() {
         testArgumentParse(BROKER_BOOTSTRAP, "groups");
@@ -509,57 +427,35 @@ public class ConfigCommandTest {
         assertEquals(createOpts.entityNames().toSeq(), seq(expectedNames));
     }
 
-    public void doTestOptionEntityTypeNames(boolean zkConfig) {
-        List<String> connectOpts = zkConfig
-            ? Arrays.asList("--zookeeper", ZK_CONNECT)
-            : Arrays.asList("--bootstrap-server", "localhost:9092");
-
-        // zookeeper config only supports "users" and "brokers" entity type
-        if (!zkConfig) {
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), 
Collections.singletonList("A"), connectOpts, "--entity-type", "topics", 
"--entity-name", "A");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), 
Collections.singletonList("1.2.3.4"), connectOpts, "--entity-name", "1.2.3.4", 
"--entity-type", "ips");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
 Collections.singletonList("A"), connectOpts, "--entity-type", 
"client-metrics", "--entity-name", "A");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), 
Collections.singletonList("A"), connectOpts, "--entity-type", "groups", 
"--entity-name", "A");
-            testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER, 
ConfigType.CLIENT), Arrays.asList("A", ""), connectOpts,
-                "--entity-type", "users", "--entity-type", "clients", 
"--entity-name", "A", "--entity-default");
-            testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER, 
ConfigType.CLIENT), Arrays.asList("", "B"), connectOpts,
-                "--entity-default", "--entity-name", "B", "--entity-type", 
"users", "--entity-type", "clients");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), 
Collections.singletonList("A"), connectOpts, "--topic", "A");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), 
Collections.singletonList("1.2.3.4"), connectOpts, "--ip", "1.2.3.4");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), 
Collections.singletonList("A"), connectOpts, "--group", "A");
-            testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, 
ConfigType.USER), Arrays.asList("B", "A"), connectOpts, "--client", "B", 
"--user", "A");
-            testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, 
ConfigType.USER), Arrays.asList("B", ""), connectOpts, "--client", "B", 
"--user-defaults");
-            testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, 
ConfigType.USER), Collections.singletonList("A"), connectOpts,
-                "--entity-type", "clients", "--entity-type", "users", 
"--entity-name", "A");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), 
Collections.emptyList(), connectOpts, "--entity-type", "topics");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), 
Collections.emptyList(), connectOpts, "--entity-type", "ips");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), 
Collections.emptyList(), connectOpts, "--entity-type", "groups");
-            
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
 Collections.emptyList(), connectOpts, "--entity-type", "client-metrics");
-        }
-
+    @Test
+    public void testOptionEntityTypeNames() {
+        List<String> connectOpts = Arrays.asList("--bootstrap-server", 
"localhost:9092");
+
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), 
Collections.singletonList("A"), connectOpts, "--entity-type", "topics", 
"--entity-name", "A");
+        testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), 
Collections.singletonList("1.2.3.4"), connectOpts, "--entity-name", "1.2.3.4", 
"--entity-type", "ips");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
 Collections.singletonList("A"), connectOpts, "--entity-type", 
"client-metrics", "--entity-name", "A");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), 
Collections.singletonList("A"), connectOpts, "--entity-type", "groups", 
"--entity-name", "A");
+        testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER, 
ConfigType.CLIENT), Arrays.asList("A", ""), connectOpts,
+            "--entity-type", "users", "--entity-type", "clients", 
"--entity-name", "A", "--entity-default");
+        testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER, 
ConfigType.CLIENT), Arrays.asList("", "B"), connectOpts,
+            "--entity-default", "--entity-name", "B", "--entity-type", 
"users", "--entity-type", "clients");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), 
Collections.singletonList("A"), connectOpts, "--topic", "A");
+        testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), 
Collections.singletonList("1.2.3.4"), connectOpts, "--ip", "1.2.3.4");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), 
Collections.singletonList("A"), connectOpts, "--group", "A");
+        testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, 
ConfigType.USER), Arrays.asList("B", "A"), connectOpts, "--client", "B", 
"--user", "A");
+        testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, 
ConfigType.USER), Arrays.asList("B", ""), connectOpts, "--client", "B", 
"--user-defaults");
+        testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT, 
ConfigType.USER), Collections.singletonList("A"), connectOpts,
+            "--entity-type", "clients", "--entity-type", "users", 
"--entity-name", "A");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC), 
Collections.emptyList(), connectOpts, "--entity-type", "topics");
+        testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP), 
Collections.emptyList(), connectOpts, "--entity-type", "ips");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP), 
Collections.emptyList(), connectOpts, "--entity-type", "groups");
+        
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
 Collections.emptyList(), connectOpts, "--entity-type", "client-metrics");
         
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.BROKER), 
Collections.singletonList("0"), connectOpts, "--entity-name", "0", 
"--entity-type", "brokers");
         
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.BROKER), 
Collections.singletonList("0"), connectOpts, "--broker", "0");
         
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.USER), 
Collections.emptyList(), connectOpts, "--entity-type", "users");
         
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.BROKER), 
Collections.emptyList(), connectOpts, "--entity-type", "brokers");
     }
 
-    @Test
-    public void testOptionEntityTypeNamesUsingZookeeper() {
-        doTestOptionEntityTypeNames(true);
-    }
-
-    @Test
-    public void testOptionEntityTypeNames() {
-        doTestOptionEntityTypeNames(false);
-    }
-
-    @Test
-    public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "client", "--entity-type", "not-recognised", 
"--alter", "--add-config", "a=b,c=d"});
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldFailIfUnrecognisedEntityType() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -567,13 +463,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
     }
 
-    @Test
-    public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "A", "--entity-type", "brokers", "--alter", 
"--add-config", "a=b,c=d"});
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldFailIfBrokerEntityTypeIsNotAnInteger() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -581,13 +470,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
     }
 
-    @Test
-    public void 
shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--broker", "A", "--alter", "--add-config", "a=b,c=d"});
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -595,13 +477,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
     }
 
-    @Test
-    public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "A", "--entity-type", "users", "--client", "B", 
"--describe"});
-        assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
-    }
-
     @Test
     public void shouldFailIfMixedEntityTypeFlags() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -616,13 +491,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
     }
 
-    @Test
-    public void shouldFailIfInvalidHostUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "A,B", "--entity-type", "ips", "--describe"});
-        assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
-    }
-
     @Test
     public void shouldFailIfUnresolvableHost() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -630,69 +498,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
     }
 
-    @Test
-    public void shouldFailIfUnresolvableHostUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "RFC2606.invalid", "--entity-type", "ips", 
"--describe"});
-        assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
-    }
-
-    @Test
-    public void shouldAddClientConfigUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "my-client-id",
-            "--entity-type", "clients",
-            "--alter",
-            "--add-config", "a=b,c=d"});
-
-        KafkaZkClient zkClient = mock(KafkaZkClient.class);
-        when(zkClient.getEntityConfigs(anyString(), 
anyString())).thenReturn(new Properties());
-
-        class TestAdminZkClient extends AdminZkClient {
-            public TestAdminZkClient(KafkaZkClient zkClient) {
-                super(zkClient, scala.None$.empty());
-            }
-
-            @Override
-            public void changeClientIdConfig(String clientId, Properties 
configChange) {
-                assertEquals("my-client-id", clientId);
-                assertEquals("b", configChange.get("a"));
-                assertEquals("d", configChange.get("c"));
-            }
-        }
-
-        // Changing USER configs don't use `KafkaZkClient` so it safe to pass 
`null`.
-        ConfigCommand.alterConfigWithZk(null, createOpts, new 
TestAdminZkClient(zkClient));
-    }
-
-    @Test
-    public void shouldAddIpConfigsUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-name", "1.2.3.4",
-            "--entity-type", "ips",
-            "--alter",
-            "--add-config", "a=b,c=d"});
-
-        KafkaZkClient zkClient = mock(KafkaZkClient.class);
-        when(zkClient.getEntityConfigs(anyString(), 
anyString())).thenReturn(new Properties());
-
-        class TestAdminZkClient extends AdminZkClient {
-            public TestAdminZkClient(KafkaZkClient zkClient) {
-                super(zkClient, scala.None$.empty());
-            }
-
-            @Override
-            public void changeIpConfig(String ip, Properties configChange) {
-                assertEquals("1.2.3.4", ip);
-                assertEquals("b", configChange.get("a"));
-                assertEquals("d", configChange.get("c"));
-            }
-        }
-
-        // Changing USER configs don't use `KafkaZkClient` so it safe to pass 
`null`.
-        ConfigCommand.alterConfigWithZk(null, createOpts, new 
TestAdminZkClient(zkClient));
-    }
-
     private Entry<List<String>, Map<String, String>> 
argsAndExpectedEntity(Optional<String> entityName, String entityType) {
         String command;
         switch (entityType) {
@@ -983,27 +788,6 @@ public class ConfigCommandTest {
         verifyUserScramCredentialsNotDescribed(defaultUserOpt);
     }
 
-    @Test
-    public void shouldAddTopicConfigUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "my-topic",
-            "--entity-type", "topics",
-            "--alter",
-            "--add-config", "a=b,c=d"));
-
-        KafkaZkClient zkClient = mock(KafkaZkClient.class);
-        when(zkClient.getEntityConfigs(anyString(), 
anyString())).thenReturn(new Properties());
-
-        ConfigCommand.alterConfigWithZk(null, createOpts, new 
AdminZkClient(zkClient, scala.None$.empty()) {
-            @Override
-            public void changeTopicConfig(String topic, Properties 
configChange) {
-                assertEquals("my-topic", topic);
-                assertEquals("b", configChange.get("a"));
-                assertEquals("d", configChange.get("c"));
-            }
-        });
-    }
-
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void shouldAlterTopicConfig(boolean file) {
@@ -1114,64 +898,6 @@ public class ConfigCommandTest {
         verify(describeResult).all();
     }
 
-    @Test
-    public void 
shouldNotAllowAddBrokerQuotaConfigWhileBrokerUpUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1",
-            "--entity-type", "brokers",
-            "--alter",
-            "--add-config", 
"leader.replication.throttled.rate=10,follower.replication.throttled.rate=20"));
-
-        KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
-        Broker mockBroker = mock(Broker.class);
-        
when(mockZkClient.getBroker(1)).thenReturn(scala.Option.apply(mockBroker));
-
-        assertThrows(IllegalArgumentException.class,
-            () -> ConfigCommand.alterConfigWithZk(mockZkClient, alterOpts, 
DUMMY_ADMIN_ZK_CLIENT));
-    }
-
-    @Test
-    public void shouldNotAllowDescribeBrokerWhileBrokerUpUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1",
-            "--entity-type", "brokers",
-            "--describe"));
-
-        KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
-        Broker mockBroker = mock(Broker.class);
-        
when(mockZkClient.getBroker(1)).thenReturn(scala.Option.apply(mockBroker));
-
-        assertThrows(IllegalArgumentException.class,
-            () -> ConfigCommand.describeConfigWithZk(mockZkClient, 
describeOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
-    @Test
-    public void shouldSupportDescribeBrokerBeforeBrokerUpUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1",
-            "--entity-type", "brokers",
-            "--describe"));
-
-        class TestAdminZkClient extends AdminZkClient {
-            public TestAdminZkClient(KafkaZkClient zkClient) {
-                super(zkClient, scala.None$.empty());
-            }
-
-            @Override
-            public Properties fetchEntityConfig(String rootEntityType, String 
sanitizedEntityName) {
-                assertEquals("brokers", rootEntityType);
-                assertEquals("1", sanitizedEntityName);
-
-                return new Properties();
-            }
-        }
-
-        KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
-        when(mockZkClient.getBroker(1)).thenReturn(scala.None$.empty());
-
-        ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new 
TestAdminZkClient(null));
-    }
-
     @Test
     public void shouldAddBrokerLoggerConfig() {
         Node node = new Node(1, "localhost", 9092);
@@ -1182,16 +908,6 @@ public class ConfigCommandTest {
         ));
     }
 
-    @Test
-    public void testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed() {
-        String[] optsList = new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-type", ConfigType.BROKER,
-            "--describe"
-        };
-
-        new ConfigCommand.ConfigCommandOptions(optsList).checkArgs();
-    }
-
     @Test
     public void 
testNoSpecifiedEntityOptionWithDescribeBrokersInBootstrapServerIsAllowed() {
         String[] optsList = new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -1224,17 +940,6 @@ public class ConfigCommandTest {
         new ConfigCommand.ConfigCommandOptions(optsList).checkArgs();
     }
 
-    @Test
-    public void testDescribeAllBrokerConfigBootstrapServerRequired() {
-        String[] optsList = new String[]{"--zookeeper", ZK_CONNECT,
-            "--entity-type", ConfigType.BROKER,
-            "--entity-name", "1",
-            "--describe",
-            "--all"};
-
-        assertThrows(IllegalArgumentException.class, () -> new 
ConfigCommand.ConfigCommandOptions(optsList).checkArgs());
-    }
-
     @Test
     public void testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed() {
         String[] optsList = new String[]{"--bootstrap-server", 
"localhost:9092",
@@ -1428,44 +1133,6 @@ public class ConfigCommandTest {
         verify(describeResult).all();
     }
 
-    @Test
-    public void shouldSupportCommaSeparatedValuesUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "my-topic",
-            "--entity-type", "topics",
-            "--alter",
-            "--add-config", "a=b,c=[d,e ,f],g=[h,i]"));
-
-        KafkaZkClient zkClient = mock(KafkaZkClient.class);
-        when(zkClient.getEntityConfigs(anyString(), 
anyString())).thenReturn(new Properties());
-
-        class TestAdminZkClient extends AdminZkClient {
-            public TestAdminZkClient(KafkaZkClient zkClient) {
-                super(zkClient, scala.None$.empty());
-            }
-
-            @Override
-            public void changeTopicConfig(String topic, Properties 
configChange) {
-                assertEquals("my-topic", topic);
-                assertEquals("b", configChange.get("a"));
-                assertEquals("d,e ,f", configChange.get("c"));
-                assertEquals("h,i", configChange.get("g"));
-            }
-        }
-
-        ConfigCommand.alterConfigWithZk(null, createOpts, new 
TestAdminZkClient(zkClient));
-    }
-
-    @Test
-    public void 
shouldNotUpdateBrokerConfigIfMalformedEntityNameUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1,2,3", //Don't support multiple brokers 
currently
-            "--entity-type", "brokers",
-            "--alter",
-            "--add-config", "leader.replication.throttled.rate=10"));
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldNotUpdateBrokerConfigIfMalformedEntityName() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", 
"localhost:9092",
@@ -1476,16 +1143,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
     }
 
-    @Test
-    public void shouldNotUpdateBrokerConfigIfMalformedConfigUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1",
-            "--entity-type", "brokers",
-            "--alter",
-            "--add-config", "a=="));
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldNotUpdateBrokerConfigIfMalformedConfig() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", 
"localhost:9092",
@@ -1496,16 +1153,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
     }
 
-    @Test
-    public void 
shouldNotUpdateBrokerConfigIfMalformedBracketConfigUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1",
-            "--entity-type", "brokers",
-            "--alter",
-            "--add-config", "a=[b,c,d=e"));
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", 
"localhost:9092",
@@ -1516,16 +1163,6 @@ public class ConfigCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), 
createOpts));
     }
 
-    @Test
-    public void 
shouldNotUpdateConfigIfNonExistingConfigIsDeletedUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "my-topic",
-            "--entity-type", "topics",
-            "--alter",
-            "--delete-config", "missing_config1, missing_config2"));
-        assertThrows(InvalidConfigurationException.class, () -> 
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
-    }
-
     @Test
     public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
         String resourceName = "my-topic";
@@ -1558,308 +1195,6 @@ public class ConfigCommandTest {
         verify(describeResult).all();
     }
 
-    @Test
-    public void shouldNotDeleteBrokerConfigWhileBrokerUpUsingZookeeper() {
-        ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "1",
-            "--entity-type", "brokers",
-            "--alter",
-            "--delete-config", "a,c"));
-
-        class TestAdminZkClient extends AdminZkClient {
-            public TestAdminZkClient(KafkaZkClient zkClient) {
-                super(zkClient, scala.None$.empty());
-            }
-
-            @Override
-            public Properties fetchEntityConfig(String rootEntityType, String 
sanitizedEntityName) {
-                Properties properties = new Properties();
-                properties.put("a", "b");
-                properties.put("c", "d");
-                properties.put("e", "f");
-                return properties;
-            }
-
-            @Override
-            public void changeBrokerConfig(Seq<Object> brokers, Properties 
configChange) {
-                assertEquals("f", configChange.get("e"));
-                assertEquals(1, configChange.size());
-            }
-        }
-
-        KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
-        Broker mockBroker = mock(Broker.class);
-        
when(mockZkClient.getBroker(1)).thenReturn(scala.Option.apply(mockBroker));
-
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(mockZkClient, createOpts, new 
TestAdminZkClient(null)));
-    }
-
-    private ConfigCommand.ConfigCommandOptions createOpts(String user, String 
config) {
-        return new ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", 
ZK_CONNECT,
-            "--entity-name", user,
-            "--entity-type", "users",
-            "--alter",
-            "--add-config", config));
-    }
-
-    private ConfigCommand.ConfigCommandOptions deleteOpts(String user, String 
mechanism) {
-        return new ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", 
ZK_CONNECT,
-            "--entity-name", user,
-            "--entity-type", "users",
-            "--alter",
-            "--delete-config", mechanism));
-    }
-
-    @Test
-    public void testScramCredentials() {
-        Map<String, Properties> credentials = new HashMap<>();
-        class CredentialChange extends AdminZkClient {
-            private final String user;
-            private final Set<String> mechanisms;
-            private final int iterations;
-
-            public CredentialChange(String user, Set<String> mechanisms, int 
iterations) {
-                super(null, scala.None$.empty());
-                this.user = user;
-                this.mechanisms = mechanisms;
-                this.iterations = iterations;
-            }
-
-            @Override
-            public Properties fetchEntityConfig(String entityType, String 
entityName) {
-                return credentials.getOrDefault(entityName, new Properties());
-            }
-
-            @Override
-            public void changeUserOrUserClientIdConfig(String 
sanitizedEntityName, Properties configChange, boolean isUserClientId) {
-                assertEquals(user, sanitizedEntityName);
-                assertEquals(mechanisms, configChange.keySet());
-                for (String mechanism : mechanisms) {
-                    String value = configChange.getProperty(mechanism);
-                    assertEquals(-1, value.indexOf("password="));
-                    ScramCredential scramCredential = 
ScramCredentialUtils.credentialFromString(value);
-                    if (iterations != scramCredential.iterations())
-                        
System.out.println("CredentialChange.changeUserOrUserClientIdConfig");
-                    assertEquals(iterations, scramCredential.iterations());
-                    credentials.put(user, configChange);
-                }
-            }
-        }
-        ConfigCommand.ConfigCommandOptions optsA = createOpts("userA", 
"SCRAM-SHA-256=[iterations=8192,password=abc, def]");
-        ConfigCommand.alterConfigWithZk(null, optsA, new 
CredentialChange("userA", Collections.singleton("SCRAM-SHA-256"), 8192));
-        ConfigCommand.ConfigCommandOptions optsB = createOpts("userB", 
"SCRAM-SHA-256=[iterations=4096,password=abc, 
def],SCRAM-SHA-512=[password=1234=abc]");
-        ConfigCommand.alterConfigWithZk(null, optsB, new 
CredentialChange("userB", new HashSet<>(Arrays.asList("SCRAM-SHA-256", 
"SCRAM-SHA-512")), 4096));
-
-        ConfigCommand.ConfigCommandOptions del256 = deleteOpts("userB", 
"SCRAM-SHA-256");
-        ConfigCommand.alterConfigWithZk(null, del256, new 
CredentialChange("userB", Collections.singleton("SCRAM-SHA-512"), 4096));
-        ConfigCommand.ConfigCommandOptions del512 = deleteOpts("userB", 
"SCRAM-SHA-512");
-        ConfigCommand.alterConfigWithZk(null, del512, new 
CredentialChange("userB", Collections.emptySet(), 4096));
-    }
-
-    @Test
-    public void testQuotaConfigEntityUsingZookeeperNotAllowed() {
-        assertThrows(IllegalArgumentException.class, () -> 
doTestQuotaConfigEntity(true));
-    }
-
-    private List<String> connectOpts;
-
-    private ConfigCommand.ConfigCommandOptions createOpts(String entityType, 
Optional<String> entityName, List<String> otherArgs) {
-        List<String> optArray = Arrays.asList(connectOpts.get(0), 
connectOpts.get(1), "--entity-type", entityType);
-        List<String> nameArray = entityName
-            .map(s -> Arrays.asList("--entity-name", s))
-            .orElse(Collections.emptyList());
-        return new ConfigCommand.ConfigCommandOptions(toArray(optArray, 
nameArray, otherArgs));
-    }
-
-    private void checkEntity(String entityType, Optional<String> entityName, 
String expectedEntityName, List<String> otherArgs) {
-        ConfigCommand.ConfigCommandOptions opts = createOpts(entityType, 
entityName, otherArgs);
-        opts.checkArgs();
-        ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(opts);
-        assertEquals(entityType, entity.root().entityType());
-        assertEquals(expectedEntityName, entity.fullSanitizedName());
-    }
-
-    private void checkInvalidArgs(String entityType, Optional<String> 
entityName, List<String> otherArgs) {
-        ConfigCommand.ConfigCommandOptions opts = createOpts(entityType, 
entityName, otherArgs);
-        assertThrows(IllegalArgumentException.class, opts::checkArgs);
-    }
-
-    private void checkInvalidEntity(String entityType, Optional<String> 
entityName, List<String> otherArgs) {
-        ConfigCommand.ConfigCommandOptions opts = createOpts(entityType, 
entityName, otherArgs);
-        opts.checkArgs();
-        assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.parseEntity(opts));
-    }
-
-    public void doTestQuotaConfigEntity(boolean zkConfig) {
-        connectOpts = zkConfig
-            ? Arrays.asList("--zookeeper", ZK_CONNECT)
-            : Arrays.asList("--bootstrap-server", "localhost:9092");
-
-        List<String> describeOpts = Collections.singletonList("--describe");
-        List<String> alterOpts = Arrays.asList("--alter", "--add-config", 
"a=b,c=d");
-
-        // <client-id> quota
-        String clientId = "client-1";
-        for (List<String> opts: Arrays.asList(describeOpts, alterOpts)) {
-            checkEntity("clients", Optional.of(clientId), clientId, opts);
-            checkEntity("clients", Optional.of(""), 
ZooKeeperInternals.DEFAULT_STRING, opts);
-        }
-        checkEntity("clients", Optional.empty(), "", describeOpts);
-        checkInvalidArgs("clients", Optional.empty(), alterOpts);
-
-        // <user> quota
-        String principal = "CN=ConfigCommandTest,O=Apache,L=<default>";
-        String sanitizedPrincipal = Sanitizer.sanitize(principal);
-        assertEquals(-1, sanitizedPrincipal.indexOf('='));
-        assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
-        for (List<String> opts: Arrays.asList(describeOpts, alterOpts)) {
-            checkEntity("users", Optional.of(principal), sanitizedPrincipal, 
opts);
-            checkEntity("users", Optional.of(""), 
ZooKeeperInternals.DEFAULT_STRING, opts);
-        }
-        checkEntity("users", Optional.empty(), "", describeOpts);
-        checkInvalidArgs("users", Optional.empty(), alterOpts);
-
-        // <user, client-id> quota
-        String userClient = sanitizedPrincipal + "/clients/" + clientId;
-        Function<String, List<String>> clientIdOpts = name -> 
Arrays.asList("--entity-type", "clients", "--entity-name", name);
-        for (List<String> opts : Arrays.asList(describeOpts, alterOpts)) {
-            checkEntity("users", Optional.of(principal), userClient, 
concat(opts, clientIdOpts.apply(clientId)));
-            checkEntity("users", Optional.of(principal), sanitizedPrincipal + 
"/clients/" + ZooKeeperInternals.DEFAULT_STRING, concat(opts, 
clientIdOpts.apply("")));
-            checkEntity("users", Optional.of(""), 
ZooKeeperInternals.DEFAULT_STRING + "/clients/" + clientId, 
concat(describeOpts, clientIdOpts.apply(clientId)));
-            checkEntity("users", Optional.of(""), 
ZooKeeperInternals.DEFAULT_STRING + "/clients/" + 
ZooKeeperInternals.DEFAULT_STRING, concat(opts, clientIdOpts.apply("")));
-        }
-        checkEntity("users", Optional.of(principal), sanitizedPrincipal + 
"/clients", concat(describeOpts, Arrays.asList("--entity-type", "clients")));
-        // Both user and client-id must be provided for alter
-        checkInvalidEntity("users", Optional.of(principal), concat(alterOpts, 
Arrays.asList("--entity-type", "clients")));
-        checkInvalidEntity("users", Optional.empty(), concat(alterOpts, 
clientIdOpts.apply(clientId)));
-        checkInvalidArgs("users", Optional.empty(), concat(alterOpts, 
Arrays.asList("--entity-type", "clients")));
-    }
-
-    @Test
-    public void testQuotaConfigEntity() {
-        doTestQuotaConfigEntity(false);
-    }
-
-    @Test
-    public void testUserClientQuotaOptsUsingZookeeperNotAllowed() {
-        assertThrows(IllegalArgumentException.class, () -> 
doTestUserClientQuotaOpts(true));
-    }
-
-    private void checkEntity(String expectedEntityType, String 
expectedEntityName, String...args) {
-        ConfigCommand.ConfigCommandOptions opts = new 
ConfigCommand.ConfigCommandOptions(toArray(connectOpts, Arrays.asList(args)));
-        opts.checkArgs();
-        ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(opts);
-        assertEquals(expectedEntityType, entity.root().entityType());
-        assertEquals(expectedEntityName, entity.fullSanitizedName());
-    }
-
-    private void doTestUserClientQuotaOpts(boolean zkConfig) {
-        connectOpts = zkConfig
-            ? Arrays.asList("--zookeeper", ZK_CONNECT)
-            : Arrays.asList("--bootstrap-server", "localhost:9092");
-
-        // <default> is a valid user principal and client-id (can be handled 
with URL-encoding),
-        checkEntity("users", Sanitizer.sanitize("<default>"),
-            "--entity-type", "users", "--entity-name", "<default>",
-            "--alter", "--add-config", "a=b,c=d");
-        checkEntity("clients", Sanitizer.sanitize("<default>"),
-            "--entity-type", "clients", "--entity-name", "<default>",
-            "--alter", "--add-config", "a=b,c=d");
-
-        checkEntity("users", Sanitizer.sanitize("CN=user1") + 
"/clients/client1",
-            "--entity-type", "users", "--entity-name", "CN=user1", 
"--entity-type", "clients", "--entity-name", "client1",
-            "--alter", "--add-config", "a=b,c=d");
-        checkEntity("users", Sanitizer.sanitize("CN=user1") + 
"/clients/client1",
-            "--entity-name", "CN=user1", "--entity-type", "users", 
"--entity-name", "client1", "--entity-type", "clients",
-            "--alter", "--add-config", "a=b,c=d");
-        checkEntity("users", Sanitizer.sanitize("CN=user1") + 
"/clients/client1",
-            "--entity-type", "clients", "--entity-name", "client1", 
"--entity-type", "users", "--entity-name", "CN=user1",
-            "--alter", "--add-config", "a=b,c=d");
-        checkEntity("users", Sanitizer.sanitize("CN=user1") + 
"/clients/client1",
-            "--entity-name", "client1", "--entity-type", "clients", 
"--entity-name", "CN=user1", "--entity-type", "users",
-            "--alter", "--add-config", "a=b,c=d");
-        checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients",
-            "--entity-type", "clients", "--entity-name", "CN=user1", 
"--entity-type", "users",
-            "--describe");
-        checkEntity("users", "/clients",
-            "--entity-type", "clients", "--entity-type", "users",
-            "--describe");
-        checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/" + 
Sanitizer.sanitize("client1?@%"),
-            "--entity-name", "client1?@%", "--entity-type", "clients", 
"--entity-name", "CN=user1", "--entity-type", "users",
-            "--alter", "--add-config", "a=b,c=d");
-    }
-
-    @Test
-    public void testUserClientQuotaOpts() {
-        doTestUserClientQuotaOpts(false);
-    }
-
-    private final KafkaZkClient zkClient = mock(KafkaZkClient.class);
-
-    public void checkEntities(List<String> opts, Map<String, List<String>> 
expectedFetches, List<String> expectedEntityNames) {
-        ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(new 
ConfigCommand.ConfigCommandOptions(toArray(opts, 
Collections.singletonList("--describe"))));
-        expectedFetches.forEach((name, values) ->
-            
when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(seq(values)));
-        Seq<ConfigCommand.ConfigEntity> entities0 = 
entity.getAllEntities(zkClient);
-        List<ConfigCommand.ConfigEntity> entities = new ArrayList<>();
-        entities0.foreach(e -> {
-            entities.add(e);
-            return null;
-        });
-        assertEquals(
-            expectedEntityNames,
-            
entities.stream().map(ConfigCommand.ConfigEntity::fullSanitizedName).collect(Collectors.toList()));
-    }
-
-    @Test
-    public void testQuotaDescribeEntities() {
-        String clientId = "a-client";
-        String principal = "CN=ConfigCommandTest.testQuotaDescribeEntities , 
O=Apache, L=<default>";
-        String sanitizedPrincipal = Sanitizer.sanitize(principal);
-        String userClient = sanitizedPrincipal + "/clients/" + clientId;
-
-        List<String> opts = Arrays.asList("--entity-type", "clients", 
"--entity-name", clientId);
-        checkEntities(opts, Collections.emptyMap(), 
Collections.singletonList(clientId));
-
-        opts = Arrays.asList("--entity-type", "clients", "--entity-default");
-        checkEntities(opts, Collections.emptyMap(), 
Collections.singletonList("<default>"));
-
-        opts = Arrays.asList("--entity-type", "clients");
-        checkEntities(opts, Collections.singletonMap("clients", 
Collections.singletonList(clientId)), Collections.singletonList(clientId));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-name", 
principal);
-        checkEntities(opts, Collections.emptyMap(), 
Collections.singletonList(sanitizedPrincipal));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-default");
-        checkEntities(opts, Collections.emptyMap(), 
Collections.singletonList("<default>"));
-
-        opts = Arrays.asList("--entity-type", "users");
-        checkEntities(opts, Collections.singletonMap("users", 
Arrays.asList("<default>", sanitizedPrincipal)), Arrays.asList("<default>", 
sanitizedPrincipal));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-name", 
principal, "--entity-type", "clients", "--entity-name", clientId);
-        checkEntities(opts, Collections.emptyMap(), 
Collections.singletonList(userClient));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-name", 
principal, "--entity-type", "clients", "--entity-default");
-        checkEntities(opts, Collections.emptyMap(), 
Collections.singletonList(sanitizedPrincipal + "/clients/<default>"));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-name", 
principal, "--entity-type", "clients");
-        checkEntities(opts,
-            Collections.singletonMap("users/" + sanitizedPrincipal + 
"/clients", Collections.singletonList("client-4")),
-            Collections.singletonList(sanitizedPrincipal + 
"/clients/client-4"));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-default", 
"--entity-type", "clients");
-        checkEntities(opts,
-            Collections.singletonMap("users/<default>/clients", 
Collections.singletonList("client-5")),
-            Collections.singletonList("<default>/clients/client-5"));
-
-        opts = Arrays.asList("--entity-type", "users", "--entity-type", 
"clients");
-        Map<String, List<String>> userMap = Collections.singletonMap("users/" 
+ sanitizedPrincipal + "/clients", Collections.singletonList("client-2"));
-        Map<String, List<String>> defaultUserMap = 
Collections.singletonMap("users/<default>/clients", 
Collections.singletonList("client-3"));
-        checkEntities(opts,
-            concat(Collections.singletonMap("users", 
Arrays.asList("<default>", sanitizedPrincipal)), defaultUserMap, userMap),
-            Arrays.asList("<default>/clients/client-3", sanitizedPrincipal + 
"/clients/client-2"));
-    }
-
     @Test
     public void shouldAlterClientMetricsConfig() {
         Node node = new Node(1, "localhost", 9092);
@@ -1972,67 +1307,6 @@ public class ConfigCommandTest {
         assertEquals("An entity name must be specified with --alter of 
client-metrics", exception.getMessage());
     }
 
-    @Test
-    public void shouldNotSupportAlterClientMetricsWithZookeeperArg() {
-        ConfigCommand.ConfigCommandOptions alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "sub",
-            "--entity-type", "client-metrics",
-            "--alter",
-            "--add-config", "interval.ms=1000"));
-
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, alterOpts::checkArgs);
-        assertEquals("Invalid entity type client-metrics, the entity type must 
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-
-        // Test for the --client-metrics alias
-        alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-                "--client-metrics", "sub",
-                "--alter",
-                "--add-config", "interval.ms=1000"));
-
-        exception = assertThrows(IllegalArgumentException.class, 
alterOpts::checkArgs);
-        assertEquals("Invalid entity type client-metrics, the entity type must 
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-    }
-
-    @Test
-    public void shouldNotSupportDescribeClientMetricsWithZookeeperArg() {
-        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "sub",
-            "--entity-type", "client-metrics",
-            "--describe"));
-
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, describeOpts::checkArgs);
-        assertEquals("Invalid entity type client-metrics, the entity type must 
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-
-        // Test for the --client-metrics alias
-        describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-                "--client-metrics", "sub",
-                "--describe"));
-
-        exception = assertThrows(IllegalArgumentException.class, 
describeOpts::checkArgs);
-        assertEquals("Invalid entity type client-metrics, the entity type must 
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-    }
-
-    @Test
-    public void shouldNotSupportAlterClientMetricsWithZookeeper() {
-        ConfigCommand.ConfigCommandOptions alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "sub",
-            "--entity-type", "client-metrics",
-            "--alter",
-            "--add-config", "interval.ms=1000"));
-
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, alterOpts::checkArgs);
-        assertEquals("Invalid entity type client-metrics, the entity type must 
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-
-        // Test for the --client-metrics alias
-        alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-                "--client-metrics", "sub",
-                "--alter",
-                "--add-config", "interval.ms=1000"));
-
-        exception = assertThrows(IllegalArgumentException.class, 
alterOpts::checkArgs);
-        assertEquals("Invalid entity type client-metrics, the entity type must 
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-    }
-
     @Test
     public void shouldAlterGroupConfig() {
         Node node = new Node(1, "localhost", 9092);
@@ -2146,70 +1420,6 @@ public class ConfigCommandTest {
         assertEquals("An entity name must be specified with --alter of 
groups", exception.getMessage());
     }
 
-    @Test
-    public void shouldNotSupportAlterGroupConfigWithZookeeperArg() {
-        ConfigCommand.ConfigCommandOptions alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "group",
-            "--entity-type", "groups",
-            "--alter",
-            "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, alterOpts::checkArgs);
-        assertEquals("Invalid entity type groups, the entity type must be one 
of users, brokers with a --zookeeper argument", exception.getMessage());
-
-        // Test for the --group alias
-        alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--group", "group",
-            "--alter",
-            "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
-        exception = assertThrows(IllegalArgumentException.class, 
alterOpts::checkArgs);
-        assertEquals("Invalid entity type groups, the entity type must be one 
of users, brokers with a --zookeeper argument", exception.getMessage());
-
-    }
-
-    @Test
-    public void shouldNotSupportDescribeGroupConfigWithZookeeperArg() {
-        ConfigCommand.ConfigCommandOptions describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "group",
-            "--entity-type", "groups",
-            "--describe"));
-
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, describeOpts::checkArgs);
-        assertEquals("Invalid entity type groups, the entity type must be one 
of users, brokers with a --zookeeper argument", exception.getMessage());
-
-        // Test for the --group alias
-        describeOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--group", "group",
-            "--describe"));
-
-        exception = assertThrows(IllegalArgumentException.class, 
describeOpts::checkArgs);
-        assertEquals("Invalid entity type groups, the entity type must be one 
of users, brokers with a --zookeeper argument", exception.getMessage());
-
-    }
-
-    @Test
-    public void shouldNotSupportAlterGroupConfigWithZookeeper() {
-        ConfigCommand.ConfigCommandOptions alterOpts = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--entity-name", "group",
-            "--entity-type", "groups",
-            "--alter",
-            "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, alterOpts, DUMMY_ADMIN_ZK_CLIENT));
-        assertEquals("groups is not a known entityType. Should be one of 
List(topics, clients, users, brokers, ips)", exception.getMessage());
-
-        // Test for the --group alias
-        ConfigCommand.ConfigCommandOptions alterOptsUsingAlias = new 
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
-            "--group", "group",
-            "--alter",
-            "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
-        exception = assertThrows(IllegalArgumentException.class, () -> 
ConfigCommand.alterConfigWithZk(null, alterOptsUsingAlias, 
DUMMY_ADMIN_ZK_CLIENT));
-        assertEquals("groups is not a known entityType. Should be one of 
List(topics, clients, users, brokers, ips)", exception.getMessage());
-
-    }
-
     public static String[] toArray(String... first) {
         return first;
     }
@@ -2234,32 +1444,6 @@ public class ConfigCommandTest {
         return res;
     }
 
-    static class DummyAdminZkClient extends AdminZkClient {
-        public DummyAdminZkClient(KafkaZkClient zkClient) {
-            super(zkClient, scala.None$.empty());
-        }
-
-        @Override
-        public void changeBrokerConfig(Seq<Object> brokers, Properties 
configs) {
-        }
-
-        @Override
-        public Properties fetchEntityConfig(String rootEntityType, String 
sanitizedEntityName) {
-            return new Properties();
-        }
-
-        @Override
-        public void changeClientIdConfig(String sanitizedClientId, Properties 
configs) {
-        }
-
-        @Override
-        public void changeUserOrUserClientIdConfig(String sanitizedEntityName, 
Properties configs, boolean isUserClientId) {
-        }
-
-        @Override
-        public void changeTopicConfig(String topic, Properties configs) {
-        }
-    }
 
     static class DummyAdminClient extends MockAdminClient {
         public DummyAdminClient(Node node) {

Reply via email to