This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c92f16a727e KAFKA-17787: Removed --zookeeper option and logic from
ConfigCommand (#17507)
c92f16a727e is described below
commit c92f16a727ebb44b0c44038fbfa254d425dffcae
Author: Chengyan <[email protected]>
AuthorDate: Tue Nov 12 01:50:49 2024 +0800
KAFKA-17787: Removed --zookeeper option and logic from ConfigCommand
(#17507)
Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
<[email protected]>
Co-authored-by: Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 352 +--------
.../test/java/kafka/admin/ConfigCommandTest.java | 862 +--------------------
2 files changed, 40 insertions(+), 1174 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 97c727826a9..993a2abe5c4 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -19,27 +19,21 @@ package kafka.admin
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
-import java.util.{Collections, Optional, Properties}
+import java.util.{Collections, Properties}
import joptsimple._
-import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
+import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
-import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo,
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig,
ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils,
ScramFormatter, ScramMechanism}
-import org.apache.kafka.common.utils.{Exit, Sanitizer, Time, Utils}
-import org.apache.kafka.server.config.{ConfigType, QuotaConfig, ZkConfigs,
ZooKeeperInternals}
-import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.utils.{Exit, Utils}
+import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
-import org.apache.zookeeper.client.ZKClientConfig
-
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -63,24 +57,12 @@ import scala.collection._
* when describing or altering default configuration for users, clients,
brokers, or ips, respectively.
* Alternatively, --user-defaults, --client-defaults, --broker-defaults, or
--ip-defaults may be specified in place of
* --entity-type <users|clients|brokers|ips> --entity-default, respectively.
- *
- * For most use cases, this script communicates with a kafka cluster
(specified via the
- * `--bootstrap-server` option). There are three exceptions where direct
communication with a
- * ZooKeeper ensemble (specified via the `--zookeeper` option) is allowed:
- *
- * 1. Describe/alter user configs where the config is a SCRAM mechanism name
(i.e. a SCRAM credential for a user)
- * 2. Describe/alter broker configs for a particular broker when that broker
is down
- * 3. Describe/alter broker default configs when all brokers are down
- *
- * For example, this allows password configs to be stored encrypted in ZK
before brokers are started,
- * avoiding cleartext passwords in `server.properties`.
*/
object ConfigCommand extends Logging {
- val BrokerDefaultEntityName = ""
+ private val BrokerDefaultEntityName = ""
val BrokerLoggerConfigType = "broker-loggers"
private val BrokerSupportedConfigTypes = ConfigType.ALL.asScala :+
BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS :+ ConfigType.GROUP
- private val ZkSupportedConfigTypes = Seq(ConfigType.USER, ConfigType.BROKER)
private val DefaultScramIterations = 4096
def main(args: Array[String]): Unit = {
@@ -91,14 +73,7 @@ object ConfigCommand extends Logging {
"This tool helps to manipulate and describe entity config for a topic,
client, user, broker, ip, client-metrics or group")
opts.checkArgs()
-
- if (opts.options.has(opts.zkConnectOpt)) {
- System.out.println(s"Warning: --zookeeper is deprecated and will be
removed in a future version of Kafka.")
- System.out.println(s"Use --bootstrap-server instead to specify a
broker to connect to.")
- processCommandWithZk(opts.options.valueOf(opts.zkConnectOpt), opts)
- } else {
- processCommand(opts)
- }
+ processCommand(opts)
} catch {
case e @ (_: IllegalArgumentException | _: InvalidConfigurationException
| _: OptionException) =>
logger.debug(s"Failed config command with args '${args.mkString("
")}'", e)
@@ -113,176 +88,6 @@ object ConfigCommand extends Logging {
}
}
- private def processCommandWithZk(zkConnectString: String, opts:
ConfigCommandOptions): Unit = {
- val zkClientConfig =
ZkSecurityMigrator.createZkClientConfigFromOption(opts.options,
opts.zkTlsConfigFile)
- .getOrElse(new ZKClientConfig())
- val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled ||
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
- Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name =
"ConfigCommand", enableEntityConfigControllerCheck = false)
- val adminZkClient = new AdminZkClient(zkClient)
- try {
- if (opts.options.has(opts.alterOpt))
- alterConfigWithZk(zkClient, opts, adminZkClient)
- else if (opts.options.has(opts.describeOpt))
- describeConfigWithZk(zkClient, opts, adminZkClient)
- } finally {
- zkClient.close()
- }
- }
-
- def alterConfigWithZk(zkClient: KafkaZkClient, opts: ConfigCommandOptions,
adminZkClient: AdminZkClient): Unit = {
- val configsToBeAdded = parseConfigsToBeAdded(opts)
- val configsToBeDeleted = parseConfigsToBeDeleted(opts)
- val entity = parseEntity(opts)
- val entityType = entity.root.entityType
- val entityName = entity.fullSanitizedName
- val errorMessage = s"--bootstrap-server option must be specified to update
$entityType configs: {add: $configsToBeAdded, delete: $configsToBeDeleted}"
- var isUserClientId = false
-
- if (entityType == ConfigType.USER) {
- isUserClientId = entity.child.exists(e =>
ConfigType.CLIENT.equals(e.entityType))
- if (!configsToBeAdded.isEmpty || configsToBeDeleted.nonEmpty) {
- val info = "User configuration updates using ZooKeeper are only
supported for SCRAM credential updates."
- val scramMechanismNames = ScramMechanism.values.map(_.mechanismName)
- // make sure every added/deleted configs are SCRAM related, other
configs are not supported using zookeeper
-
require(configsToBeAdded.stringPropertyNames.asScala.forall(scramMechanismNames.contains),
- s"$errorMessage. $info")
- require(configsToBeDeleted.forall(scramMechanismNames.contains),
s"$errorMessage. $info")
- }
- preProcessScramCredentials(configsToBeAdded)
- } else if (entityType == ConfigType.BROKER) {
- // Dynamic broker configs can be updated using ZooKeeper only if the
corresponding broker is not running.
- if (!configsToBeAdded.isEmpty || configsToBeDeleted.nonEmpty) {
- validateBrokersNotRunning(entityName, adminZkClient, zkClient,
errorMessage)
-
- val perBrokerConfig = entityName != ZooKeeperInternals.DEFAULT_STRING
- preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig)
- }
- }
-
- // compile the final set of configs
- val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- configs ++= configsToBeAdded
- configsToBeDeleted.foreach(configs.remove(_))
-
- adminZkClient.changeConfigs(entityType, entityName, configs,
isUserClientId)
-
- System.out.println(s"Completed updating config for entity: $entity.")
- }
-
- private def validateBrokersNotRunning(entityName: String,
- adminZkClient: AdminZkClient,
- zkClient: KafkaZkClient,
- errorMessage: String): Unit = {
- val perBrokerConfig = entityName != ZooKeeperInternals.DEFAULT_STRING
- val info = "Broker configuration operations using ZooKeeper are only
supported if the affected broker(s) are not running."
- if (perBrokerConfig) {
- adminZkClient.parseBroker(entityName).foreach { brokerId =>
- require(zkClient.getBroker(brokerId).isEmpty, s"$errorMessage - broker
$brokerId is running. $info")
- }
- } else {
- val runningBrokersCount = zkClient.getAllBrokersInCluster.size
- require(runningBrokersCount == 0, s"$errorMessage - $runningBrokersCount
brokers are running. $info")
- }
- }
-
- private def preProcessScramCredentials(configsToBeAdded: Properties): Unit =
{
- def scramCredential(mechanism: ScramMechanism, credentialStr: String):
String = {
- val pattern = "(?:iterations=([0-9]*),)?password=(.*)".r
- val (iterations, password) = credentialStr match {
- case pattern(iterations, password) => (if (iterations != null)
iterations.toInt else DefaultScramIterations, password)
- case _ => throw new IllegalArgumentException(s"Invalid credential
property $mechanism=$credentialStr")
- }
- if (iterations < mechanism.minIterations())
- throw new IllegalArgumentException(s"Iterations $iterations is less
than the minimum ${mechanism.minIterations()} required for $mechanism")
- val credential = new
ScramFormatter(mechanism).generateCredential(password, iterations)
- ScramCredentialUtils.credentialToString(credential)
- }
- for (mechanism <- ScramMechanism.values) {
- configsToBeAdded.getProperty(mechanism.mechanismName) match {
- case null =>
- case value =>
- configsToBeAdded.setProperty(mechanism.mechanismName,
scramCredential(mechanism, value))
- }
- }
- }
-
- def createPasswordEncoder(encoderConfigs: java.util.Map[String, String]):
PasswordEncoder = {
- val encoderSecret =
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
- .orElseThrow(() => new IllegalArgumentException("Password encoder secret
not specified"))
- PasswordEncoder.encrypting(new Password(encoderSecret),
- null,
-
encoderConfigs.getOrDefault(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
-
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG))
- .map[Int](Integer.parseInt)
- .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
-
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG))
- .map[Int](Integer.parseInt)
- .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT)
- )
- }
-
- /**
- * Pre-process broker configs provided to convert them to persistent format.
- * Password configs are encrypted using the secret
`PasswordEncoderConfigs.SECRET`.
- * The secret is removed from `configsToBeAdded` and will not be persisted
in ZooKeeper.
- */
- private def preProcessBrokerConfigs(configsToBeAdded: Properties,
perBrokerConfig: Boolean): Unit = {
- val passwordEncoderConfigs = new Properties
- passwordEncoderConfigs ++= configsToBeAdded.asScala.filter { case (key, _)
=> key.startsWith("password.encoder.") }
- if (!passwordEncoderConfigs.isEmpty) {
- info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be
used for encrypting" +
- " passwords, but will not be stored in ZooKeeper.")
- passwordEncoderConfigs.asScala.keySet.foreach(configsToBeAdded.remove)
- }
-
- DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
- val passwordConfigs =
configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
- if (passwordConfigs.nonEmpty) {
-
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
- s"${PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG} must be
specified to update $passwordConfigs." +
- " Other password encoder configs like cipher algorithm and
iterations may also be specified" +
- " to override the default encoding parameters. Password encoder
configs will not be persisted" +
- " in ZooKeeper."
- )
- val passwordConfigsMap = new java.util.HashMap[String, String]
- passwordEncoderConfigs.forEach { (key, value) =>
- passwordConfigsMap.put(key.toString, value.toString)
- }
- val passwordEncoder = createPasswordEncoder(passwordConfigsMap)
- passwordConfigs.foreach { configName =>
- val encodedValue = passwordEncoder.encode(new
Password(configsToBeAdded.getProperty(configName)))
- configsToBeAdded.setProperty(configName, encodedValue)
- }
- }
- }
-
- def describeConfigWithZk(zkClient: KafkaZkClient, opts:
ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
- val configEntity = parseEntity(opts)
- val entityType = configEntity.root.entityType
- val describeAllUsers = entityType == ConfigType.USER &&
configEntity.root.sanitizedName.isEmpty && configEntity.child.isEmpty
- val entityName = configEntity.fullSanitizedName
- val errorMessage = s"--bootstrap-server option must be specified to
describe $entityType"
- if (entityType == ConfigType.BROKER) {
- // Dynamic broker configs can be described using ZooKeeper only if the
corresponding broker is not running.
- validateBrokersNotRunning(entityName, adminZkClient, zkClient,
errorMessage)
- }
-
- val entities = configEntity.getAllEntities(zkClient)
- for (entity <- entities) {
- val configs = adminZkClient.fetchEntityConfig(entity.root.entityType,
entity.fullSanitizedName)
- // When describing all users, don't include empty user nodes with only
<user, client> quota overrides.
- if (!configs.isEmpty || !describeAllUsers) {
- System.out.println("Configs for %s are %s"
- .format(entity, configs.asScala.map(kv => kv._1 + "=" +
kv._2).mkString(",")))
- }
- }
- }
@nowarn("cat=deprecation")
def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
@@ -693,119 +498,8 @@ object ConfigCommand extends Logging {
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30,
TimeUnit.SECONDS).asScala
}
- case class Entity(entityType: String, sanitizedName: Option[String]) {
- val entityPath: String = sanitizedName match {
- case Some(n) => entityType + "/" + n
- case None => entityType
- }
- override def toString: String = {
- val typeName = entityType match {
- case ConfigType.USER => "user-principal"
- case ConfigType.CLIENT => "client-id"
- case ConfigType.TOPIC => "topic"
- case ConfigType.GROUP => "group"
- case t => t
- }
- sanitizedName match {
- case Some(ZooKeeperInternals.DEFAULT_STRING) => "default " + typeName
- case Some(n) =>
- val desanitized = if (entityType == ConfigType.USER || entityType ==
ConfigType.CLIENT) Sanitizer.desanitize(n) else n
- s"$typeName '$desanitized'"
- case None => entityType
- }
- }
- }
-
- case class ConfigEntity(root: Entity, child: Option[Entity]) {
- val fullSanitizedName: String = root.sanitizedName.getOrElse("") +
child.map(s => "/" + s.entityPath).getOrElse("")
-
- def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = {
- // Describe option examples:
- // Describe entity with specified name:
- // --entity-type topics --entity-name topic1 (topic1)
- // Describe all entities of a type (topics/brokers/users/clients):
- // --entity-type topics (all topics)
- // Describe <user, client> quotas:
- // --entity-type users --entity-name user1 --entity-type clients
--entity-name client2 (<user1, client2>)
- // --entity-type users --entity-name userA --entity-type clients
(all clients of userA)
- // --entity-type users --entity-type clients (all <user, client>s))
- // Describe default quotas:
- // --entity-type users --entity-default (Default user)
- // --entity-type users --entity-default --entity-type clients
--entity-default (Default <user, client>)
- (root.sanitizedName, child) match {
- case (None, _) =>
- val rootEntities = zkClient.getAllEntitiesWithConfig(root.entityType)
- .map(name =>
ConfigEntity(Entity(root.entityType, Some(name)), child))
- child match {
- case Some(s) =>
- rootEntities.flatMap(rootEntity =>
- ConfigEntity(rootEntity.root, Some(Entity(s.entityType,
None))).getAllEntities(zkClient))
- case None => rootEntities
- }
- case (_, Some(childEntity)) =>
- childEntity.sanitizedName match {
- case Some(_) => Seq(this)
- case None =>
- zkClient.getAllEntitiesWithConfig(root.entityPath + "/" +
childEntity.entityType)
- .map(name => ConfigEntity(root,
Some(Entity(childEntity.entityType, Some(name)))))
-
- }
- case (_, None) =>
- Seq(this)
- }
- }
-
- override def toString: String = {
- root.toString + child.map(s => ", " + s.toString).getOrElse("")
- }
- }
-
- def parseEntity(opts: ConfigCommandOptions): ConfigEntity = {
- val entityTypes = opts.entityTypes
- val entityNames = opts.entityNames
- if (entityTypes.head == ConfigType.USER || entityTypes.head ==
ConfigType.CLIENT)
- parseClientQuotaEntity(opts, entityTypes, entityNames)
- else {
- // Exactly one entity type and at-most one entity name expected for
other entities
- val name = entityNames.headOption match {
- case Some("") => Some(ZooKeeperInternals.DEFAULT_STRING)
- case v => v
- }
- ConfigEntity(Entity(entityTypes.head, name), None)
- }
- }
-
- private def parseClientQuotaEntity(opts: ConfigCommandOptions, types:
List[String], names: List[String]): ConfigEntity = {
- if (opts.options.has(opts.alterOpt) && names.size != types.size)
- throw new IllegalArgumentException("--entity-name or --entity-default
must be specified with each --entity-type for --alter")
-
- val reverse = types.size == 2 && types.head == ConfigType.CLIENT
- val entityTypes = if (reverse) types.reverse else types
- val sortedNames = (if (reverse && names.length == 2) names.reverse else
names).iterator
-
- def sanitizeName(entityType: String, name: String) = {
- if (name.isEmpty)
- ZooKeeperInternals.DEFAULT_STRING
- else {
- entityType match {
- case ConfigType.USER | ConfigType.CLIENT => Sanitizer.sanitize(name)
- case _ => throw new IllegalArgumentException("Invalid entity type "
+ entityType)
- }
- }
- }
-
- val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext)
Some(sanitizeName(t, sortedNames.next())) else None))
- ConfigEntity(entities.head, entities.lift(1))
- }
class ConfigCommandOptions(args: Array[String]) extends
CommandDefaultOptions(args) {
-
- val zkConnectOpt: OptionSpec[String] = parser.accepts("zookeeper",
"DEPRECATED. The connection string for the zookeeper connection in the form
host:port. " +
- "Multiple URLS can be given to allow fail-over. Required when
configuring SCRAM credentials for users or " +
- "dynamic broker configs when the relevant broker(s) are down. Not
allowed otherwise.")
- .withRequiredArg
- .describedAs("urls")
- .ofType(classOf[String])
val bootstrapServerOpt: OptionSpec[String] =
parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
.withRequiredArg
.describedAs("server to connect to")
@@ -831,7 +525,7 @@ object ConfigCommand extends Logging {
.ofType(classOf[String])
private val entityDefault: OptionSpecBuilder =
parser.accepts("entity-default", "Default entity name for
clients/users/brokers/ips (applies to corresponding entity type)")
- val nl: String = System.lineSeparator()
+ private val nl: String = System.lineSeparator()
val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key
Value pairs of configs to add. Square brackets can be used to group values
which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of
valid configurations: " +
"For entity-type '" + ConfigType.TOPIC + "': " +
LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.BROKER + "': " +
DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl,
nl) +
@@ -879,10 +573,6 @@ object ConfigCommand extends Logging {
val clientMetrics: OptionSpec[String] = parser.accepts("client-metrics",
"The client metrics config resource name.")
.withRequiredArg
.ofType(classOf[String])
- val zkTlsConfigFile: OptionSpec[String] =
parser.accepts("zk-tls-config-file",
- "Identifies the file where ZooKeeper client TLS connectivity properties
are defined. Any properties other than " +
-
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(",
") + " are ignored.")
- .withRequiredArg().describedAs("ZooKeeper TLS
configuration").ofType(classOf[String])
options = parser.parse(args : _*)
private val entityFlags = List((topic, ConfigType.TOPIC),
@@ -930,10 +620,12 @@ object ConfigCommand extends Logging {
if (entityTypeVals.size != entityTypeVals.distinct.size)
throw new IllegalArgumentException(s"Duplicate entity type(s)
specified: ${entityTypeVals.diff(entityTypeVals.distinct).mkString(",")}")
- val (allowedEntityTypes, connectOptString) = if
(options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt))
- (BrokerSupportedConfigTypes, "--bootstrap-server or
--bootstrap-controller")
- else
- (ZkSupportedConfigTypes, "--zookeeper")
+ val (allowedEntityTypes, connectOptString) =
+ if (options.has(bootstrapServerOpt) ||
options.has(bootstrapControllerOpt)) {
+ (BrokerSupportedConfigTypes, "--bootstrap-server or
--bootstrap-controller")
+ } else {
+ throw new IllegalArgumentException("Either --bootstrap-server or
--bootstrap-controller must be specified.")
+ }
entityTypeVals.foreach(entityTypeVal =>
if (!allowedEntityTypes.contains(entityTypeVal))
@@ -952,19 +644,9 @@ object ConfigCommand extends Logging {
val hasEntityDefault = entityNames.exists(_.isEmpty)
val numConnectOptions = (if (options.has(bootstrapServerOpt)) 1 else 0) +
- (if (options.has(bootstrapControllerOpt)) 1 else 0) +
- (if (options.has(zkConnectOpt)) 1 else 0)
- if (numConnectOptions == 0)
- throw new IllegalArgumentException("One of the required
--bootstrap-server, --boostrap-controller, or --zookeeper arguments must be
specified")
- else if (numConnectOptions > 1)
- throw new IllegalArgumentException("Only one of --bootstrap-server,
--boostrap-controller, and --zookeeper can be specified")
-
- if (options.has(allOpt) && options.has(zkConnectOpt)) {
- throw new IllegalArgumentException(s"--bootstrap-server must be
specified for --all")
- }
- if (options.has(zkTlsConfigFile) && !options.has(zkConnectOpt)) {
- throw new IllegalArgumentException("Only the --zookeeper option can be
used with the --zk-tls-config-file option.")
- }
+ (if (options.has(bootstrapControllerOpt)) 1 else 0)
+ if (numConnectOptions > 1)
+ throw new IllegalArgumentException("Only one of --bootstrap-server or
--bootstrap-controller can be specified")
if (hasEntityName && (entityTypeVals.contains(ConfigType.BROKER) ||
entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker,
brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId
=>
try brokerId.toInt catch {
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index 6d3cba6d246..74fe3271f23 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -16,10 +16,6 @@
*/
package kafka.admin;
-import kafka.cluster.Broker;
-import kafka.zk.AdminZkClient;
-import kafka.zk.KafkaZkClient;
-
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterClientQuotasOptions;
import org.apache.kafka.clients.admin.AlterClientQuotasResult;
@@ -44,12 +40,8 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
-import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Sanitizer;
import org.apache.kafka.server.config.ConfigType;
-import org.apache.kafka.server.config.ZooKeeperInternals;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -59,7 +51,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -75,7 +66,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -87,17 +77,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConfigCommandTest {
- private static final String ZK_CONNECT = "localhost:2181";
- private static final DummyAdminZkClient DUMMY_ADMIN_ZK_CLIENT = new
DummyAdminZkClient(null);
-
- private static final List<String> ZOOKEEPER_BOOTSTRAP =
Arrays.asList("--zookeeper", ZK_CONNECT);
private static final List<String> BROKER_BOOTSTRAP =
Arrays.asList("--bootstrap-server", "localhost:9092");
private static final List<String> CONTROLLER_BOOTSTRAP =
Arrays.asList("--bootstrap-controller", "localhost:9093");
@@ -106,34 +91,6 @@ public class ConfigCommandTest {
assertNonZeroStatusExit("--blah");
}
- @Test
- public void shouldExitWithNonZeroStatusOnZkCommandWithTopicsEntity() {
- assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
- "--entity-type", "topics",
- "--describe")));
- }
-
- @Test
- public void shouldExitWithNonZeroStatusOnZkCommandWithClientsEntity() {
- assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
- "--entity-type", "clients",
- "--describe")));
- }
-
- @Test
- public void shouldExitWithNonZeroStatusOnZkCommandWithIpsEntity() {
- assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
- "--entity-type", "ips",
- "--describe")));
- }
-
- @Test
- public void shouldExitWithNonZeroStatusOnZkCommandWithGroupsEntity() {
- assertNonZeroStatusExit(toArray(ZOOKEEPER_BOOTSTRAP, Arrays.asList(
- "--entity-type", "groups",
- "--describe")));
- }
-
@Test
public void shouldExitWithNonZeroStatusAlterUserQuotaWithoutEntityName() {
assertNonZeroStatusExit(toArray(BROKER_BOOTSTRAP, Arrays.asList(
@@ -155,15 +112,6 @@ public class ConfigCommandTest {
"--describe", "--broker-defaults")));
}
- @Test
- public void
shouldExitWithNonZeroStatusOnBrokerCommandWithZkTlsConfigFile() {
- assertNonZeroStatusExit(
- "--bootstrap-server", "invalid host",
- "--entity-type", "users",
- "--zk-tls-config-file", "zk_tls_config.properties",
- "--describe");
- }
-
public static void assertNonZeroStatusExit(String... args) {
AtomicReference<Integer> exitStatus = new AtomicReference<>();
Exit.setExitProcedure((status, __) -> {
@@ -183,11 +131,6 @@ public class ConfigCommandTest {
assertEquals(1, exitStatus.get());
}
- @Test
- public void shouldFailParseArgumentsForClientsEntityTypeUsingZookeeper() {
- assertThrows(IllegalArgumentException.class, () ->
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "clients"));
- }
-
@Test
public void shouldParseArgumentsForClientsEntityTypeWithBrokerBootstrap() {
testArgumentParse(BROKER_BOOTSTRAP, "clients");
@@ -198,11 +141,6 @@ public class ConfigCommandTest {
testArgumentParse(CONTROLLER_BOOTSTRAP, "clients");
}
- @Test
- public void shouldParseArgumentsForUsersEntityTypeUsingZookeeper() {
- testArgumentParse(ZOOKEEPER_BOOTSTRAP, "users");
- }
-
@Test
public void shouldParseArgumentsForUsersEntityTypeWithBrokerBootstrap() {
testArgumentParse(BROKER_BOOTSTRAP, "users");
@@ -213,11 +151,6 @@ public class ConfigCommandTest {
testArgumentParse(CONTROLLER_BOOTSTRAP, "users");
}
- @Test
- public void shouldFailParseArgumentsForTopicsEntityTypeUsingZookeeper() {
- assertThrows(IllegalArgumentException.class, () ->
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "topics"));
- }
-
@Test
public void shouldParseArgumentsForTopicsEntityTypeWithBrokerBootstrap() {
testArgumentParse(BROKER_BOOTSTRAP, "topics");
@@ -228,11 +161,6 @@ public class ConfigCommandTest {
testArgumentParse(CONTROLLER_BOOTSTRAP, "topics");
}
- @Test
- public void shouldParseArgumentsForBrokersEntityTypeUsingZookeeper() {
- testArgumentParse(ZOOKEEPER_BOOTSTRAP, "brokers");
- }
-
@Test
public void shouldParseArgumentsForBrokersEntityTypeWithBrokerBootstrap() {
testArgumentParse(BROKER_BOOTSTRAP, "brokers");
@@ -253,11 +181,6 @@ public class ConfigCommandTest {
testArgumentParse(CONTROLLER_BOOTSTRAP, "broker-loggers");
}
- @Test
- public void shouldFailParseArgumentsForIpEntityTypeUsingZookeeper() {
- assertThrows(IllegalArgumentException.class, () ->
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "ips"));
- }
-
@Test
public void shouldParseArgumentsForIpEntityTypeWithBrokerBootstrap() {
testArgumentParse(BROKER_BOOTSTRAP, "ips");
@@ -268,11 +191,6 @@ public class ConfigCommandTest {
testArgumentParse(CONTROLLER_BOOTSTRAP, "ips");
}
- @Test
- public void shouldFailParseArgumentsForGroupEntityTypeUsingZookeeper() {
- assertThrows(IllegalArgumentException.class, () ->
testArgumentParse(ZOOKEEPER_BOOTSTRAP, "groups"));
- }
-
@Test
public void shouldParseArgumentsForGroupEntityTypeWithBrokerBootstrap() {
testArgumentParse(BROKER_BOOTSTRAP, "groups");
@@ -509,57 +427,35 @@ public class ConfigCommandTest {
assertEquals(createOpts.entityNames().toSeq(), seq(expectedNames));
}
- public void doTestOptionEntityTypeNames(boolean zkConfig) {
- List<String> connectOpts = zkConfig
- ? Arrays.asList("--zookeeper", ZK_CONNECT)
- : Arrays.asList("--bootstrap-server", "localhost:9092");
-
- // zookeeper config only supports "users" and "brokers" entity type
- if (!zkConfig) {
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC),
Collections.singletonList("A"), connectOpts, "--entity-type", "topics",
"--entity-name", "A");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP),
Collections.singletonList("1.2.3.4"), connectOpts, "--entity-name", "1.2.3.4",
"--entity-type", "ips");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
Collections.singletonList("A"), connectOpts, "--entity-type",
"client-metrics", "--entity-name", "A");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP),
Collections.singletonList("A"), connectOpts, "--entity-type", "groups",
"--entity-name", "A");
- testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER,
ConfigType.CLIENT), Arrays.asList("A", ""), connectOpts,
- "--entity-type", "users", "--entity-type", "clients",
"--entity-name", "A", "--entity-default");
- testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER,
ConfigType.CLIENT), Arrays.asList("", "B"), connectOpts,
- "--entity-default", "--entity-name", "B", "--entity-type",
"users", "--entity-type", "clients");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC),
Collections.singletonList("A"), connectOpts, "--topic", "A");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP),
Collections.singletonList("1.2.3.4"), connectOpts, "--ip", "1.2.3.4");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP),
Collections.singletonList("A"), connectOpts, "--group", "A");
- testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT,
ConfigType.USER), Arrays.asList("B", "A"), connectOpts, "--client", "B",
"--user", "A");
- testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT,
ConfigType.USER), Arrays.asList("B", ""), connectOpts, "--client", "B",
"--user-defaults");
- testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT,
ConfigType.USER), Collections.singletonList("A"), connectOpts,
- "--entity-type", "clients", "--entity-type", "users",
"--entity-name", "A");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC),
Collections.emptyList(), connectOpts, "--entity-type", "topics");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP),
Collections.emptyList(), connectOpts, "--entity-type", "ips");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP),
Collections.emptyList(), connectOpts, "--entity-type", "groups");
-
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
Collections.emptyList(), connectOpts, "--entity-type", "client-metrics");
- }
-
+ @Test
+ public void testOptionEntityTypeNames() {
+ List<String> connectOpts = Arrays.asList("--bootstrap-server",
"localhost:9092");
+
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC),
Collections.singletonList("A"), connectOpts, "--entity-type", "topics",
"--entity-name", "A");
+ testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP),
Collections.singletonList("1.2.3.4"), connectOpts, "--entity-name", "1.2.3.4",
"--entity-type", "ips");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
Collections.singletonList("A"), connectOpts, "--entity-type",
"client-metrics", "--entity-name", "A");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP),
Collections.singletonList("A"), connectOpts, "--entity-type", "groups",
"--entity-name", "A");
+ testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER,
ConfigType.CLIENT), Arrays.asList("A", ""), connectOpts,
+ "--entity-type", "users", "--entity-type", "clients",
"--entity-name", "A", "--entity-default");
+ testExpectedEntityTypeNames(Arrays.asList(ConfigType.USER,
ConfigType.CLIENT), Arrays.asList("", "B"), connectOpts,
+ "--entity-default", "--entity-name", "B", "--entity-type",
"users", "--entity-type", "clients");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC),
Collections.singletonList("A"), connectOpts, "--topic", "A");
+ testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP),
Collections.singletonList("1.2.3.4"), connectOpts, "--ip", "1.2.3.4");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP),
Collections.singletonList("A"), connectOpts, "--group", "A");
+ testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT,
ConfigType.USER), Arrays.asList("B", "A"), connectOpts, "--client", "B",
"--user", "A");
+ testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT,
ConfigType.USER), Arrays.asList("B", ""), connectOpts, "--client", "B",
"--user-defaults");
+ testExpectedEntityTypeNames(Arrays.asList(ConfigType.CLIENT,
ConfigType.USER), Collections.singletonList("A"), connectOpts,
+ "--entity-type", "clients", "--entity-type", "users",
"--entity-name", "A");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.TOPIC),
Collections.emptyList(), connectOpts, "--entity-type", "topics");
+ testExpectedEntityTypeNames(Collections.singletonList(ConfigType.IP),
Collections.emptyList(), connectOpts, "--entity-type", "ips");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.GROUP),
Collections.emptyList(), connectOpts, "--entity-type", "groups");
+
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.CLIENT_METRICS),
Collections.emptyList(), connectOpts, "--entity-type", "client-metrics");
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.BROKER),
Collections.singletonList("0"), connectOpts, "--entity-name", "0",
"--entity-type", "brokers");
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.BROKER),
Collections.singletonList("0"), connectOpts, "--broker", "0");
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.USER),
Collections.emptyList(), connectOpts, "--entity-type", "users");
testExpectedEntityTypeNames(Collections.singletonList(ConfigType.BROKER),
Collections.emptyList(), connectOpts, "--entity-type", "brokers");
}
- @Test
- public void testOptionEntityTypeNamesUsingZookeeper() {
- doTestOptionEntityTypeNames(true);
- }
-
- @Test
- public void testOptionEntityTypeNames() {
- doTestOptionEntityTypeNames(false);
- }
-
- @Test
- public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "client", "--entity-type", "not-recognised",
"--alter", "--add-config", "a=b,c=d"});
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldFailIfUnrecognisedEntityType() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server",
"localhost:9092",
@@ -567,13 +463,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts));
}
- @Test
- public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "A", "--entity-type", "brokers", "--alter",
"--add-config", "a=b,c=d"});
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldFailIfBrokerEntityTypeIsNotAnInteger() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server",
"localhost:9092",
@@ -581,13 +470,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts));
}
- @Test
- public void
shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--broker", "A", "--alter", "--add-config", "a=b,c=d"});
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server",
"localhost:9092",
@@ -595,13 +477,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts));
}
- @Test
- public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "A", "--entity-type", "users", "--client", "B",
"--describe"});
- assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
- }
-
@Test
public void shouldFailIfMixedEntityTypeFlags() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server",
"localhost:9092",
@@ -616,13 +491,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
}
- @Test
- public void shouldFailIfInvalidHostUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "A,B", "--entity-type", "ips", "--describe"});
- assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
- }
-
@Test
public void shouldFailIfUnresolvableHost() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server",
"localhost:9092",
@@ -630,69 +498,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
}
- @Test
- public void shouldFailIfUnresolvableHostUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "RFC2606.invalid", "--entity-type", "ips",
"--describe"});
- assertThrows(IllegalArgumentException.class, createOpts::checkArgs);
- }
-
- @Test
- public void shouldAddClientConfigUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "my-client-id",
- "--entity-type", "clients",
- "--alter",
- "--add-config", "a=b,c=d"});
-
- KafkaZkClient zkClient = mock(KafkaZkClient.class);
- when(zkClient.getEntityConfigs(anyString(),
anyString())).thenReturn(new Properties());
-
- class TestAdminZkClient extends AdminZkClient {
- public TestAdminZkClient(KafkaZkClient zkClient) {
- super(zkClient, scala.None$.empty());
- }
-
- @Override
- public void changeClientIdConfig(String clientId, Properties
configChange) {
- assertEquals("my-client-id", clientId);
- assertEquals("b", configChange.get("a"));
- assertEquals("d", configChange.get("c"));
- }
- }
-
- // Changing USER configs don't use `KafkaZkClient` so it safe to pass
`null`.
- ConfigCommand.alterConfigWithZk(null, createOpts, new
TestAdminZkClient(zkClient));
- }
-
- @Test
- public void shouldAddIpConfigsUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-name", "1.2.3.4",
- "--entity-type", "ips",
- "--alter",
- "--add-config", "a=b,c=d"});
-
- KafkaZkClient zkClient = mock(KafkaZkClient.class);
- when(zkClient.getEntityConfigs(anyString(),
anyString())).thenReturn(new Properties());
-
- class TestAdminZkClient extends AdminZkClient {
- public TestAdminZkClient(KafkaZkClient zkClient) {
- super(zkClient, scala.None$.empty());
- }
-
- @Override
- public void changeIpConfig(String ip, Properties configChange) {
- assertEquals("1.2.3.4", ip);
- assertEquals("b", configChange.get("a"));
- assertEquals("d", configChange.get("c"));
- }
- }
-
- // Changing USER configs don't use `KafkaZkClient` so it safe to pass
`null`.
- ConfigCommand.alterConfigWithZk(null, createOpts, new
TestAdminZkClient(zkClient));
- }
-
private Entry<List<String>, Map<String, String>>
argsAndExpectedEntity(Optional<String> entityName, String entityType) {
String command;
switch (entityType) {
@@ -983,27 +788,6 @@ public class ConfigCommandTest {
verifyUserScramCredentialsNotDescribed(defaultUserOpt);
}
- @Test
- public void shouldAddTopicConfigUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "my-topic",
- "--entity-type", "topics",
- "--alter",
- "--add-config", "a=b,c=d"));
-
- KafkaZkClient zkClient = mock(KafkaZkClient.class);
- when(zkClient.getEntityConfigs(anyString(),
anyString())).thenReturn(new Properties());
-
- ConfigCommand.alterConfigWithZk(null, createOpts, new
AdminZkClient(zkClient, scala.None$.empty()) {
- @Override
- public void changeTopicConfig(String topic, Properties
configChange) {
- assertEquals("my-topic", topic);
- assertEquals("b", configChange.get("a"));
- assertEquals("d", configChange.get("c"));
- }
- });
- }
-
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldAlterTopicConfig(boolean file) {
@@ -1114,64 +898,6 @@ public class ConfigCommandTest {
verify(describeResult).all();
}
- @Test
- public void
shouldNotAllowAddBrokerQuotaConfigWhileBrokerUpUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--alter",
- "--add-config",
"leader.replication.throttled.rate=10,follower.replication.throttled.rate=20"));
-
- KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
- Broker mockBroker = mock(Broker.class);
-
when(mockZkClient.getBroker(1)).thenReturn(scala.Option.apply(mockBroker));
-
- assertThrows(IllegalArgumentException.class,
- () -> ConfigCommand.alterConfigWithZk(mockZkClient, alterOpts,
DUMMY_ADMIN_ZK_CLIENT));
- }
-
- @Test
- public void shouldNotAllowDescribeBrokerWhileBrokerUpUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--describe"));
-
- KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
- Broker mockBroker = mock(Broker.class);
-
when(mockZkClient.getBroker(1)).thenReturn(scala.Option.apply(mockBroker));
-
- assertThrows(IllegalArgumentException.class,
- () -> ConfigCommand.describeConfigWithZk(mockZkClient,
describeOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
- @Test
- public void shouldSupportDescribeBrokerBeforeBrokerUpUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--describe"));
-
- class TestAdminZkClient extends AdminZkClient {
- public TestAdminZkClient(KafkaZkClient zkClient) {
- super(zkClient, scala.None$.empty());
- }
-
- @Override
- public Properties fetchEntityConfig(String rootEntityType, String
sanitizedEntityName) {
- assertEquals("brokers", rootEntityType);
- assertEquals("1", sanitizedEntityName);
-
- return new Properties();
- }
- }
-
- KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
- when(mockZkClient.getBroker(1)).thenReturn(scala.None$.empty());
-
- ConfigCommand.describeConfigWithZk(mockZkClient, describeOpts, new
TestAdminZkClient(null));
- }
-
@Test
public void shouldAddBrokerLoggerConfig() {
Node node = new Node(1, "localhost", 9092);
@@ -1182,16 +908,6 @@ public class ConfigCommandTest {
));
}
- @Test
- public void testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed() {
- String[] optsList = new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-type", ConfigType.BROKER,
- "--describe"
- };
-
- new ConfigCommand.ConfigCommandOptions(optsList).checkArgs();
- }
-
@Test
public void
testNoSpecifiedEntityOptionWithDescribeBrokersInBootstrapServerIsAllowed() {
String[] optsList = new String[]{"--bootstrap-server",
"localhost:9092",
@@ -1224,17 +940,6 @@ public class ConfigCommandTest {
new ConfigCommand.ConfigCommandOptions(optsList).checkArgs();
}
- @Test
- public void testDescribeAllBrokerConfigBootstrapServerRequired() {
- String[] optsList = new String[]{"--zookeeper", ZK_CONNECT,
- "--entity-type", ConfigType.BROKER,
- "--entity-name", "1",
- "--describe",
- "--all"};
-
- assertThrows(IllegalArgumentException.class, () -> new
ConfigCommand.ConfigCommandOptions(optsList).checkArgs());
- }
-
@Test
public void testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed() {
String[] optsList = new String[]{"--bootstrap-server",
"localhost:9092",
@@ -1428,44 +1133,6 @@ public class ConfigCommandTest {
verify(describeResult).all();
}
- @Test
- public void shouldSupportCommaSeparatedValuesUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "my-topic",
- "--entity-type", "topics",
- "--alter",
- "--add-config", "a=b,c=[d,e ,f],g=[h,i]"));
-
- KafkaZkClient zkClient = mock(KafkaZkClient.class);
- when(zkClient.getEntityConfigs(anyString(),
anyString())).thenReturn(new Properties());
-
- class TestAdminZkClient extends AdminZkClient {
- public TestAdminZkClient(KafkaZkClient zkClient) {
- super(zkClient, scala.None$.empty());
- }
-
- @Override
- public void changeTopicConfig(String topic, Properties
configChange) {
- assertEquals("my-topic", topic);
- assertEquals("b", configChange.get("a"));
- assertEquals("d,e ,f", configChange.get("c"));
- assertEquals("h,i", configChange.get("g"));
- }
- }
-
- ConfigCommand.alterConfigWithZk(null, createOpts, new
TestAdminZkClient(zkClient));
- }
-
- @Test
- public void
shouldNotUpdateBrokerConfigIfMalformedEntityNameUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1,2,3", //Don't support multiple brokers
currently
- "--entity-type", "brokers",
- "--alter",
- "--add-config", "leader.replication.throttled.rate=10"));
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldNotUpdateBrokerConfigIfMalformedEntityName() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
@@ -1476,16 +1143,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts));
}
- @Test
- public void shouldNotUpdateBrokerConfigIfMalformedConfigUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--alter",
- "--add-config", "a=="));
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldNotUpdateBrokerConfigIfMalformedConfig() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
@@ -1496,16 +1153,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts));
}
- @Test
- public void
shouldNotUpdateBrokerConfigIfMalformedBracketConfigUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--alter",
- "--add-config", "a=[b,c,d=e"));
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server",
"localhost:9092",
@@ -1516,16 +1163,6 @@ public class ConfigCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)),
createOpts));
}
- @Test
- public void
shouldNotUpdateConfigIfNonExistingConfigIsDeletedUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "my-topic",
- "--entity-type", "topics",
- "--alter",
- "--delete-config", "missing_config1, missing_config2"));
- assertThrows(InvalidConfigurationException.class, () ->
ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT));
- }
-
@Test
public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
String resourceName = "my-topic";
@@ -1558,308 +1195,6 @@ public class ConfigCommandTest {
verify(describeResult).all();
}
- @Test
- public void shouldNotDeleteBrokerConfigWhileBrokerUpUsingZookeeper() {
- ConfigCommand.ConfigCommandOptions createOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "1",
- "--entity-type", "brokers",
- "--alter",
- "--delete-config", "a,c"));
-
- class TestAdminZkClient extends AdminZkClient {
- public TestAdminZkClient(KafkaZkClient zkClient) {
- super(zkClient, scala.None$.empty());
- }
-
- @Override
- public Properties fetchEntityConfig(String rootEntityType, String
sanitizedEntityName) {
- Properties properties = new Properties();
- properties.put("a", "b");
- properties.put("c", "d");
- properties.put("e", "f");
- return properties;
- }
-
- @Override
- public void changeBrokerConfig(Seq<Object> brokers, Properties
configChange) {
- assertEquals("f", configChange.get("e"));
- assertEquals(1, configChange.size());
- }
- }
-
- KafkaZkClient mockZkClient = mock(KafkaZkClient.class);
- Broker mockBroker = mock(Broker.class);
-
when(mockZkClient.getBroker(1)).thenReturn(scala.Option.apply(mockBroker));
-
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(mockZkClient, createOpts, new
TestAdminZkClient(null)));
- }
-
- private ConfigCommand.ConfigCommandOptions createOpts(String user, String
config) {
- return new ConfigCommand.ConfigCommandOptions(toArray("--zookeeper",
ZK_CONNECT,
- "--entity-name", user,
- "--entity-type", "users",
- "--alter",
- "--add-config", config));
- }
-
- private ConfigCommand.ConfigCommandOptions deleteOpts(String user, String
mechanism) {
- return new ConfigCommand.ConfigCommandOptions(toArray("--zookeeper",
ZK_CONNECT,
- "--entity-name", user,
- "--entity-type", "users",
- "--alter",
- "--delete-config", mechanism));
- }
-
- @Test
- public void testScramCredentials() {
- Map<String, Properties> credentials = new HashMap<>();
- class CredentialChange extends AdminZkClient {
- private final String user;
- private final Set<String> mechanisms;
- private final int iterations;
-
- public CredentialChange(String user, Set<String> mechanisms, int
iterations) {
- super(null, scala.None$.empty());
- this.user = user;
- this.mechanisms = mechanisms;
- this.iterations = iterations;
- }
-
- @Override
- public Properties fetchEntityConfig(String entityType, String
entityName) {
- return credentials.getOrDefault(entityName, new Properties());
- }
-
- @Override
- public void changeUserOrUserClientIdConfig(String
sanitizedEntityName, Properties configChange, boolean isUserClientId) {
- assertEquals(user, sanitizedEntityName);
- assertEquals(mechanisms, configChange.keySet());
- for (String mechanism : mechanisms) {
- String value = configChange.getProperty(mechanism);
- assertEquals(-1, value.indexOf("password="));
- ScramCredential scramCredential =
ScramCredentialUtils.credentialFromString(value);
- if (iterations != scramCredential.iterations())
-
System.out.println("CredentialChange.changeUserOrUserClientIdConfig");
- assertEquals(iterations, scramCredential.iterations());
- credentials.put(user, configChange);
- }
- }
- }
- ConfigCommand.ConfigCommandOptions optsA = createOpts("userA",
"SCRAM-SHA-256=[iterations=8192,password=abc, def]");
- ConfigCommand.alterConfigWithZk(null, optsA, new
CredentialChange("userA", Collections.singleton("SCRAM-SHA-256"), 8192));
- ConfigCommand.ConfigCommandOptions optsB = createOpts("userB",
"SCRAM-SHA-256=[iterations=4096,password=abc,
def],SCRAM-SHA-512=[password=1234=abc]");
- ConfigCommand.alterConfigWithZk(null, optsB, new
CredentialChange("userB", new HashSet<>(Arrays.asList("SCRAM-SHA-256",
"SCRAM-SHA-512")), 4096));
-
- ConfigCommand.ConfigCommandOptions del256 = deleteOpts("userB",
"SCRAM-SHA-256");
- ConfigCommand.alterConfigWithZk(null, del256, new
CredentialChange("userB", Collections.singleton("SCRAM-SHA-512"), 4096));
- ConfigCommand.ConfigCommandOptions del512 = deleteOpts("userB",
"SCRAM-SHA-512");
- ConfigCommand.alterConfigWithZk(null, del512, new
CredentialChange("userB", Collections.emptySet(), 4096));
- }
-
- @Test
- public void testQuotaConfigEntityUsingZookeeperNotAllowed() {
- assertThrows(IllegalArgumentException.class, () ->
doTestQuotaConfigEntity(true));
- }
-
- private List<String> connectOpts;
-
- private ConfigCommand.ConfigCommandOptions createOpts(String entityType,
Optional<String> entityName, List<String> otherArgs) {
- List<String> optArray = Arrays.asList(connectOpts.get(0),
connectOpts.get(1), "--entity-type", entityType);
- List<String> nameArray = entityName
- .map(s -> Arrays.asList("--entity-name", s))
- .orElse(Collections.emptyList());
- return new ConfigCommand.ConfigCommandOptions(toArray(optArray,
nameArray, otherArgs));
- }
-
- private void checkEntity(String entityType, Optional<String> entityName,
String expectedEntityName, List<String> otherArgs) {
- ConfigCommand.ConfigCommandOptions opts = createOpts(entityType,
entityName, otherArgs);
- opts.checkArgs();
- ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(opts);
- assertEquals(entityType, entity.root().entityType());
- assertEquals(expectedEntityName, entity.fullSanitizedName());
- }
-
- private void checkInvalidArgs(String entityType, Optional<String>
entityName, List<String> otherArgs) {
- ConfigCommand.ConfigCommandOptions opts = createOpts(entityType,
entityName, otherArgs);
- assertThrows(IllegalArgumentException.class, opts::checkArgs);
- }
-
- private void checkInvalidEntity(String entityType, Optional<String>
entityName, List<String> otherArgs) {
- ConfigCommand.ConfigCommandOptions opts = createOpts(entityType,
entityName, otherArgs);
- opts.checkArgs();
- assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.parseEntity(opts));
- }
-
- public void doTestQuotaConfigEntity(boolean zkConfig) {
- connectOpts = zkConfig
- ? Arrays.asList("--zookeeper", ZK_CONNECT)
- : Arrays.asList("--bootstrap-server", "localhost:9092");
-
- List<String> describeOpts = Collections.singletonList("--describe");
- List<String> alterOpts = Arrays.asList("--alter", "--add-config",
"a=b,c=d");
-
- // <client-id> quota
- String clientId = "client-1";
- for (List<String> opts: Arrays.asList(describeOpts, alterOpts)) {
- checkEntity("clients", Optional.of(clientId), clientId, opts);
- checkEntity("clients", Optional.of(""),
ZooKeeperInternals.DEFAULT_STRING, opts);
- }
- checkEntity("clients", Optional.empty(), "", describeOpts);
- checkInvalidArgs("clients", Optional.empty(), alterOpts);
-
- // <user> quota
- String principal = "CN=ConfigCommandTest,O=Apache,L=<default>";
- String sanitizedPrincipal = Sanitizer.sanitize(principal);
- assertEquals(-1, sanitizedPrincipal.indexOf('='));
- assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
- for (List<String> opts: Arrays.asList(describeOpts, alterOpts)) {
- checkEntity("users", Optional.of(principal), sanitizedPrincipal,
opts);
- checkEntity("users", Optional.of(""),
ZooKeeperInternals.DEFAULT_STRING, opts);
- }
- checkEntity("users", Optional.empty(), "", describeOpts);
- checkInvalidArgs("users", Optional.empty(), alterOpts);
-
- // <user, client-id> quota
- String userClient = sanitizedPrincipal + "/clients/" + clientId;
- Function<String, List<String>> clientIdOpts = name ->
Arrays.asList("--entity-type", "clients", "--entity-name", name);
- for (List<String> opts : Arrays.asList(describeOpts, alterOpts)) {
- checkEntity("users", Optional.of(principal), userClient,
concat(opts, clientIdOpts.apply(clientId)));
- checkEntity("users", Optional.of(principal), sanitizedPrincipal +
"/clients/" + ZooKeeperInternals.DEFAULT_STRING, concat(opts,
clientIdOpts.apply("")));
- checkEntity("users", Optional.of(""),
ZooKeeperInternals.DEFAULT_STRING + "/clients/" + clientId,
concat(describeOpts, clientIdOpts.apply(clientId)));
- checkEntity("users", Optional.of(""),
ZooKeeperInternals.DEFAULT_STRING + "/clients/" +
ZooKeeperInternals.DEFAULT_STRING, concat(opts, clientIdOpts.apply("")));
- }
- checkEntity("users", Optional.of(principal), sanitizedPrincipal +
"/clients", concat(describeOpts, Arrays.asList("--entity-type", "clients")));
- // Both user and client-id must be provided for alter
- checkInvalidEntity("users", Optional.of(principal), concat(alterOpts,
Arrays.asList("--entity-type", "clients")));
- checkInvalidEntity("users", Optional.empty(), concat(alterOpts,
clientIdOpts.apply(clientId)));
- checkInvalidArgs("users", Optional.empty(), concat(alterOpts,
Arrays.asList("--entity-type", "clients")));
- }
-
- @Test
- public void testQuotaConfigEntity() {
- doTestQuotaConfigEntity(false);
- }
-
- @Test
- public void testUserClientQuotaOptsUsingZookeeperNotAllowed() {
- assertThrows(IllegalArgumentException.class, () ->
doTestUserClientQuotaOpts(true));
- }
-
- private void checkEntity(String expectedEntityType, String
expectedEntityName, String...args) {
- ConfigCommand.ConfigCommandOptions opts = new
ConfigCommand.ConfigCommandOptions(toArray(connectOpts, Arrays.asList(args)));
- opts.checkArgs();
- ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(opts);
- assertEquals(expectedEntityType, entity.root().entityType());
- assertEquals(expectedEntityName, entity.fullSanitizedName());
- }
-
- private void doTestUserClientQuotaOpts(boolean zkConfig) {
- connectOpts = zkConfig
- ? Arrays.asList("--zookeeper", ZK_CONNECT)
- : Arrays.asList("--bootstrap-server", "localhost:9092");
-
- // <default> is a valid user principal and client-id (can be handled
with URL-encoding),
- checkEntity("users", Sanitizer.sanitize("<default>"),
- "--entity-type", "users", "--entity-name", "<default>",
- "--alter", "--add-config", "a=b,c=d");
- checkEntity("clients", Sanitizer.sanitize("<default>"),
- "--entity-type", "clients", "--entity-name", "<default>",
- "--alter", "--add-config", "a=b,c=d");
-
- checkEntity("users", Sanitizer.sanitize("CN=user1") +
"/clients/client1",
- "--entity-type", "users", "--entity-name", "CN=user1",
"--entity-type", "clients", "--entity-name", "client1",
- "--alter", "--add-config", "a=b,c=d");
- checkEntity("users", Sanitizer.sanitize("CN=user1") +
"/clients/client1",
- "--entity-name", "CN=user1", "--entity-type", "users",
"--entity-name", "client1", "--entity-type", "clients",
- "--alter", "--add-config", "a=b,c=d");
- checkEntity("users", Sanitizer.sanitize("CN=user1") +
"/clients/client1",
- "--entity-type", "clients", "--entity-name", "client1",
"--entity-type", "users", "--entity-name", "CN=user1",
- "--alter", "--add-config", "a=b,c=d");
- checkEntity("users", Sanitizer.sanitize("CN=user1") +
"/clients/client1",
- "--entity-name", "client1", "--entity-type", "clients",
"--entity-name", "CN=user1", "--entity-type", "users",
- "--alter", "--add-config", "a=b,c=d");
- checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients",
- "--entity-type", "clients", "--entity-name", "CN=user1",
"--entity-type", "users",
- "--describe");
- checkEntity("users", "/clients",
- "--entity-type", "clients", "--entity-type", "users",
- "--describe");
- checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/" +
Sanitizer.sanitize("client1?@%"),
- "--entity-name", "client1?@%", "--entity-type", "clients",
"--entity-name", "CN=user1", "--entity-type", "users",
- "--alter", "--add-config", "a=b,c=d");
- }
-
- @Test
- public void testUserClientQuotaOpts() {
- doTestUserClientQuotaOpts(false);
- }
-
- private final KafkaZkClient zkClient = mock(KafkaZkClient.class);
-
- public void checkEntities(List<String> opts, Map<String, List<String>>
expectedFetches, List<String> expectedEntityNames) {
- ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(new
ConfigCommand.ConfigCommandOptions(toArray(opts,
Collections.singletonList("--describe"))));
- expectedFetches.forEach((name, values) ->
-
when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(seq(values)));
- Seq<ConfigCommand.ConfigEntity> entities0 =
entity.getAllEntities(zkClient);
- List<ConfigCommand.ConfigEntity> entities = new ArrayList<>();
- entities0.foreach(e -> {
- entities.add(e);
- return null;
- });
- assertEquals(
- expectedEntityNames,
-
entities.stream().map(ConfigCommand.ConfigEntity::fullSanitizedName).collect(Collectors.toList()));
- }
-
- @Test
- public void testQuotaDescribeEntities() {
- String clientId = "a-client";
- String principal = "CN=ConfigCommandTest.testQuotaDescribeEntities ,
O=Apache, L=<default>";
- String sanitizedPrincipal = Sanitizer.sanitize(principal);
- String userClient = sanitizedPrincipal + "/clients/" + clientId;
-
- List<String> opts = Arrays.asList("--entity-type", "clients",
"--entity-name", clientId);
- checkEntities(opts, Collections.emptyMap(),
Collections.singletonList(clientId));
-
- opts = Arrays.asList("--entity-type", "clients", "--entity-default");
- checkEntities(opts, Collections.emptyMap(),
Collections.singletonList("<default>"));
-
- opts = Arrays.asList("--entity-type", "clients");
- checkEntities(opts, Collections.singletonMap("clients",
Collections.singletonList(clientId)), Collections.singletonList(clientId));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-name",
principal);
- checkEntities(opts, Collections.emptyMap(),
Collections.singletonList(sanitizedPrincipal));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-default");
- checkEntities(opts, Collections.emptyMap(),
Collections.singletonList("<default>"));
-
- opts = Arrays.asList("--entity-type", "users");
- checkEntities(opts, Collections.singletonMap("users",
Arrays.asList("<default>", sanitizedPrincipal)), Arrays.asList("<default>",
sanitizedPrincipal));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-name",
principal, "--entity-type", "clients", "--entity-name", clientId);
- checkEntities(opts, Collections.emptyMap(),
Collections.singletonList(userClient));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-name",
principal, "--entity-type", "clients", "--entity-default");
- checkEntities(opts, Collections.emptyMap(),
Collections.singletonList(sanitizedPrincipal + "/clients/<default>"));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-name",
principal, "--entity-type", "clients");
- checkEntities(opts,
- Collections.singletonMap("users/" + sanitizedPrincipal +
"/clients", Collections.singletonList("client-4")),
- Collections.singletonList(sanitizedPrincipal +
"/clients/client-4"));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-default",
"--entity-type", "clients");
- checkEntities(opts,
- Collections.singletonMap("users/<default>/clients",
Collections.singletonList("client-5")),
- Collections.singletonList("<default>/clients/client-5"));
-
- opts = Arrays.asList("--entity-type", "users", "--entity-type",
"clients");
- Map<String, List<String>> userMap = Collections.singletonMap("users/"
+ sanitizedPrincipal + "/clients", Collections.singletonList("client-2"));
- Map<String, List<String>> defaultUserMap =
Collections.singletonMap("users/<default>/clients",
Collections.singletonList("client-3"));
- checkEntities(opts,
- concat(Collections.singletonMap("users",
Arrays.asList("<default>", sanitizedPrincipal)), defaultUserMap, userMap),
- Arrays.asList("<default>/clients/client-3", sanitizedPrincipal +
"/clients/client-2"));
- }
-
@Test
public void shouldAlterClientMetricsConfig() {
Node node = new Node(1, "localhost", 9092);
@@ -1972,67 +1307,6 @@ public class ConfigCommandTest {
assertEquals("An entity name must be specified with --alter of
client-metrics", exception.getMessage());
}
- @Test
- public void shouldNotSupportAlterClientMetricsWithZookeeperArg() {
- ConfigCommand.ConfigCommandOptions alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "sub",
- "--entity-type", "client-metrics",
- "--alter",
- "--add-config", "interval.ms=1000"));
-
- IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, alterOpts::checkArgs);
- assertEquals("Invalid entity type client-metrics, the entity type must
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-
- // Test for the --client-metrics alias
- alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--client-metrics", "sub",
- "--alter",
- "--add-config", "interval.ms=1000"));
-
- exception = assertThrows(IllegalArgumentException.class,
alterOpts::checkArgs);
- assertEquals("Invalid entity type client-metrics, the entity type must
be one of users, brokers with a --zookeeper argument", exception.getMessage());
- }
-
- @Test
- public void shouldNotSupportDescribeClientMetricsWithZookeeperArg() {
- ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "sub",
- "--entity-type", "client-metrics",
- "--describe"));
-
- IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, describeOpts::checkArgs);
- assertEquals("Invalid entity type client-metrics, the entity type must
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-
- // Test for the --client-metrics alias
- describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--client-metrics", "sub",
- "--describe"));
-
- exception = assertThrows(IllegalArgumentException.class,
describeOpts::checkArgs);
- assertEquals("Invalid entity type client-metrics, the entity type must
be one of users, brokers with a --zookeeper argument", exception.getMessage());
- }
-
- @Test
- public void shouldNotSupportAlterClientMetricsWithZookeeper() {
- ConfigCommand.ConfigCommandOptions alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "sub",
- "--entity-type", "client-metrics",
- "--alter",
- "--add-config", "interval.ms=1000"));
-
- IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, alterOpts::checkArgs);
- assertEquals("Invalid entity type client-metrics, the entity type must
be one of users, brokers with a --zookeeper argument", exception.getMessage());
-
- // Test for the --client-metrics alias
- alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--client-metrics", "sub",
- "--alter",
- "--add-config", "interval.ms=1000"));
-
- exception = assertThrows(IllegalArgumentException.class,
alterOpts::checkArgs);
- assertEquals("Invalid entity type client-metrics, the entity type must
be one of users, brokers with a --zookeeper argument", exception.getMessage());
- }
-
@Test
public void shouldAlterGroupConfig() {
Node node = new Node(1, "localhost", 9092);
@@ -2146,70 +1420,6 @@ public class ConfigCommandTest {
assertEquals("An entity name must be specified with --alter of
groups", exception.getMessage());
}
- @Test
- public void shouldNotSupportAlterGroupConfigWithZookeeperArg() {
- ConfigCommand.ConfigCommandOptions alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "group",
- "--entity-type", "groups",
- "--alter",
- "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
- IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, alterOpts::checkArgs);
- assertEquals("Invalid entity type groups, the entity type must be one
of users, brokers with a --zookeeper argument", exception.getMessage());
-
- // Test for the --group alias
- alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--group", "group",
- "--alter",
- "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
- exception = assertThrows(IllegalArgumentException.class,
alterOpts::checkArgs);
- assertEquals("Invalid entity type groups, the entity type must be one
of users, brokers with a --zookeeper argument", exception.getMessage());
-
- }
-
- @Test
- public void shouldNotSupportDescribeGroupConfigWithZookeeperArg() {
- ConfigCommand.ConfigCommandOptions describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "group",
- "--entity-type", "groups",
- "--describe"));
-
- IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, describeOpts::checkArgs);
- assertEquals("Invalid entity type groups, the entity type must be one
of users, brokers with a --zookeeper argument", exception.getMessage());
-
- // Test for the --group alias
- describeOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--group", "group",
- "--describe"));
-
- exception = assertThrows(IllegalArgumentException.class,
describeOpts::checkArgs);
- assertEquals("Invalid entity type groups, the entity type must be one
of users, brokers with a --zookeeper argument", exception.getMessage());
-
- }
-
- @Test
- public void shouldNotSupportAlterGroupConfigWithZookeeper() {
- ConfigCommand.ConfigCommandOptions alterOpts = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--entity-name", "group",
- "--entity-type", "groups",
- "--alter",
- "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
- IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, alterOpts, DUMMY_ADMIN_ZK_CLIENT));
- assertEquals("groups is not a known entityType. Should be one of
List(topics, clients, users, brokers, ips)", exception.getMessage());
-
- // Test for the --group alias
- ConfigCommand.ConfigCommandOptions alterOptsUsingAlias = new
ConfigCommand.ConfigCommandOptions(toArray("--zookeeper", ZK_CONNECT,
- "--group", "group",
- "--alter",
- "--add-config", "consumer.heartbeat.interval.ms=6000"));
-
- exception = assertThrows(IllegalArgumentException.class, () ->
ConfigCommand.alterConfigWithZk(null, alterOptsUsingAlias,
DUMMY_ADMIN_ZK_CLIENT));
- assertEquals("groups is not a known entityType. Should be one of
List(topics, clients, users, brokers, ips)", exception.getMessage());
-
- }
-
public static String[] toArray(String... first) {
return first;
}
@@ -2234,32 +1444,6 @@ public class ConfigCommandTest {
return res;
}
- static class DummyAdminZkClient extends AdminZkClient {
- public DummyAdminZkClient(KafkaZkClient zkClient) {
- super(zkClient, scala.None$.empty());
- }
-
- @Override
- public void changeBrokerConfig(Seq<Object> brokers, Properties
configs) {
- }
-
- @Override
- public Properties fetchEntityConfig(String rootEntityType, String
sanitizedEntityName) {
- return new Properties();
- }
-
- @Override
- public void changeClientIdConfig(String sanitizedClientId, Properties
configs) {
- }
-
- @Override
- public void changeUserOrUserClientIdConfig(String sanitizedEntityName,
Properties configs, boolean isUserClientId) {
- }
-
- @Override
- public void changeTopicConfig(String topic, Properties configs) {
- }
- }
static class DummyAdminClient extends MockAdminClient {
public DummyAdminClient(Node node) {