dajac commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1860381480
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +173,26 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
- case ConfigType.TOPIC =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER |
ConfigType.GROUP =>
+ val configResourceType = entityTypeHead match {
+ case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+ case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
Review Comment:
nit: A space misses after `=>`.
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +173,26 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
- case ConfigType.TOPIC =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER |
ConfigType.GROUP =>
+ val configResourceType = entityTypeHead match {
+ case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+ case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
+ case ConfigType.BROKER => ConfigResource.Type.BROKER
+ case ConfigType.GROUP => ConfigResource.Type.GROUP
+ }
+ try {
+ alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, configResourceType)
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case _: UnsupportedVersionException if entityTypeHead ==
ConfigType.BROKER =>
+ System.err.println(s"Could not update broker config
$entityNameHead, because brokers don't support api
${ApiKeys.INCREMENTAL_ALTER_CONFIGS},"
+ + " You can upgrade your brokers to version 2.3.0 or newer to
avoid this error.")
+ return
Review Comment:
I find it a little weird that we log the error here vs doing it in the `main
` function. Is there a particular reason for doing it this way? My concern is
that it does not exit as it should with `Exit.exit(1)`. Could we catch it there
by adding a new case? An alternative would be to still catch it here but to
rethrow an `UnsupportedVersionException` with the message that we want and
catch it in the main. What do you think?
##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -333,6 +333,9 @@ object TestUtils extends Logging {
if
(!props.containsKey(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG))
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
rack.foreach(props.put(ServerConfigs.BROKER_RACK_CONFIG, _))
+ // reduce log cleaner offset map memory usage, must be at greater than 1MB
per cleaner thread, set to 2M+2 so that
+ // we can set 2 cleaner threads.
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097154")
Review Comment:
Is this something that we could set at the test level? You can pass extra
configs to `@ClusterTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]