This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 21caf6b123d KAFKA-16629 Add broker-related tests to ConfigCommandIntegrationTest (#15840) 21caf6b123d is described below commit 21caf6b123dd70a68d258fd925785a529f3a48d9 Author: Ken Huang <100591800+m1a...@users.noreply.github.com> AuthorDate: Fri May 31 21:24:33 2024 +0900 KAFKA-16629 Add broker-related tests to ConfigCommandIntegrationTest (#15840) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- checkstyle/import-control-core.xml | 1 + .../src/main/scala/kafka/admin/ConfigCommand.scala | 27 +- core/src/main/scala/kafka/cluster/Broker.scala | 4 + .../kafka/admin/ConfigCommandIntegrationTest.java | 539 ++++++++++++++++----- .../test/java/kafka/admin/ConfigCommandTest.java | 11 +- 5 files changed, 438 insertions(+), 144 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index ed6c53a322b..6724ea9bf3b 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -130,5 +130,6 @@ <allow pkg="kafka.test"/> <allow pkg="kafka.test.annotation"/> <allow pkg="kafka.test.junit"/> + <allow pkg="org.apache.kafka.clients.admin" /> </subpackage> </import-control> diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 7421aed637d..04dce7a0255 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -19,7 +19,7 @@ package kafka.admin import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit -import java.util.{Collections, Properties} +import java.util.{Collections, Optional, Properties} import joptsimple._ import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig} import kafka.utils.Implicits._ @@ -210,15 +210,19 @@ object ConfigCommand extends Logging { } } - def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = { - encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG) - val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, - throw new IllegalArgumentException("Password encoder secret not specified")) + def createPasswordEncoder(encoderConfigs: java.util.Map[String, String]): PasswordEncoder = { + val encoderSecret = Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)) + .orElseThrow(() => new IllegalArgumentException("Password encoder secret not specified")) PasswordEncoder.encrypting(new Password(encoderSecret), null, - encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT), - encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT), - encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT)) + encoderConfigs.getOrDefault(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT), + Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG)) + .map[Int](Integer.parseInt) + .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT), + Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG)) + .map[Int](Integer.parseInt) + .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT) + ) } /** @@ -244,8 +248,11 @@ object ConfigCommand extends Logging { " to override the default encoding parameters. Password encoder configs will not be persisted" + " in ZooKeeper." ) - - val passwordEncoder = createPasswordEncoder(passwordEncoderConfigs.asScala) + val passwordConfigsMap = new java.util.HashMap[String, String] + passwordEncoderConfigs.forEach { (key, value) => + passwordConfigsMap.put(key.toString, value.toString) + } + val passwordEncoder = createPasswordEncoder(passwordConfigsMap) passwordConfigs.foreach { configName => val encodedValue = passwordEncoder.encode(new Password(configsToBeAdded.getProperty(configName))) configsToBeAdded.setProperty(configName, encodedValue) diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index ede63cd3c0a..e5835201fa3 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -43,6 +43,10 @@ object Broker { new Broker(id, endPoints, rack, emptySupportedFeatures) } + def apply(id: Int, endPoint: EndPoint, rack: Option[String]): Broker = { + new Broker(id, Seq(endPoint), rack, emptySupportedFeatures) + } + private def supportedFeatures(features: java.util.Map[String, VersionRange]): java.util .Map[String, SupportedVersionRange] = { features.asScala.map { case (name, range) => diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index ab0f30f49b6..23c250a764e 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -26,23 +26,22 @@ import kafka.test.junit.ZkClusterInvocationContext; import kafka.zk.AdminZkClient; import kafka.zk.BrokerInfo; import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.security.PasswordEncoder; -import org.apache.kafka.security.PasswordEncoderConfigs; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ZooKeeperInternals; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.junit.platform.commons.util.StringUtils; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -50,234 +49,512 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG; +import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG; +import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG; +import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG; +import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class ConfigCommandIntegrationTest { - AdminZkClient adminZkClient; - List<String> alterOpts; + private List<String> alterOpts; + private final String defaultBrokerId = "0"; private final ClusterInstance cluster; + private static Runnable run(Stream<String> command) { + return () -> { + try { + ConfigCommand.main(command.toArray(String[]::new)); + } catch (RuntimeException e) { + // do nothing. + } finally { + Exit.resetExitProcedure(); + } + }; + } + public ConfigCommandIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } - @ClusterTest(types = {Type.ZK, Type.KRAFT}) + @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-name", cluster.isKRaftTest() ? "0" : "1", "--entity-type", "brokers", "--alter", "--add-config", "security.inter.broker.protocol=PLAINTEXT")), - errOut -> - assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); + errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); } - @ClusterTest(types = {Type.ZK}) public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-type", "users", "--entity-name", "admin", "--alter", "--add-config", "consumer_byte_rate=20000")), - errOut -> - assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); + errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); } - public static void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) { - AtomicReference<Integer> exitStatus = new AtomicReference<>(); - Exit.setExitProcedure((status, __) -> { - exitStatus.set(status); - throw new RuntimeException(); - }); - - String errOut = captureStandardErr(() -> { - try { - ConfigCommand.main(args.toArray(String[]::new)); - } catch (RuntimeException e) { - // do nothing. - } finally { - Exit.resetExitProcedure(); - } - }); - - checkErrOut.accept(errOut); - assertNotNull(exitStatus.get()); - assertEquals(1, exitStatus.get()); - } - - private Stream<String> quorumArgs() { - return cluster.isKRaftTest() - ? Stream.of("--bootstrap-server", cluster.bootstrapServers()) - : Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); - } - - public List<String> entityOp(Optional<String> brokerId) { - return brokerId.map(id -> Arrays.asList("--entity-name", id)).orElse(Collections.singletonList("--entity-default")); - } - - public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) throws Exception { - alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap()); - } - - public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId, Map<String, String> encoderConfigs) { - String configStr = Stream.of(configs.entrySet(), encoderConfigs.entrySet()) - .flatMap(Set::stream) - .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.joining(",")); - ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--add-config", configStr))); - ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); - } - - void verifyConfig(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) { - Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); - assertEquals(configs, entityConfigs); - } - - void alterAndVerifyConfig(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) throws Exception { - alterConfigWithZk(zkClient, configs, brokerId); - verifyConfig(zkClient, configs, brokerId); - } + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testNullStatusOnKraftCommandAlterUserQuota() { + Stream<String> command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "users", + "--entity-name", "admin", + "--alter", "--add-config", "consumer_byte_rate=20000")); + String message = captureStandardMsg(run(command)); - void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String> configNames, Optional<String> brokerId) { - ConfigCommand.ConfigCommandOptions deleteOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--delete-config", String.join(",", configNames)))); - ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient); - verifyConfig(zkClient, Collections.emptyMap(), brokerId); + assertTrue(StringUtils.isBlank(message), message); } - @ClusterTest(types = {Type.ZK}) + @ClusterTest(types = Type.ZK) public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { cluster.shutdownBroker(0); String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); String brokerId = "1"; - adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); - alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty()); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("message.max.bytes", "110000")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singletonMap("message.max.bytes", "120000")); // Change config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty()); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("message.max.bytes", "130000")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singletonMap("message.max.bytes", "140000")); // Delete config - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singleton("message.max.bytes")); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singleton("message.max.bytes")); // Listener configs: should work only with listener name - alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); - deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singleton("listener.name.internal.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map<String, String> configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); - Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); + Map<String, String> encoderConfigs = new HashMap<>(configs); + encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); + assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password"); - PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)); + PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs); assertEquals("secret", passwordEncoder.decode(encodedPassword).value()); assertEquals(configs.size(), brokerConfigs.size()); // Password config update with overrides for encoder parameters - Map<String, String> configs2 = Collections.singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); - Map<String, String> encoderConfigs2 = new HashMap<>(); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64"); - alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), encoderConfigs2); + Map<String, String> encoderConfigs2 = generateEncodeConfig(); + alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs2); Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId); - String encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password"); - assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value()); - assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value()); + String encodedPassword2 = brokerConfigs2.getProperty("listener.name.external.ssl.keystore.password"); + assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs) + .decode(encodedPassword2).value()); + assertEquals("secret2", ConfigCommand.createPasswordEncoder(encoderConfigs2) + .decode(encodedPassword2).value()); // Password config update at default cluster-level should fail - assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, configs, Optional.empty(), encoderConfigs)); + assertThrows(ConfigException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), encoderConfigs)); // Dynamic config updates using ZK should fail if broker is running. registerBrokerInZk(zkClient, Integer.parseInt(brokerId)); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "210000"), Optional.of(brokerId))); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "220000"), Optional.empty())); + assertThrows(IllegalArgumentException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, + Optional.of(brokerId), singletonMap("message.max.bytes", "210000"))); + assertThrows(IllegalArgumentException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, + Optional.empty(), singletonMap("message.max.bytes", "220000"))); // Dynamic config updates using ZK should for a different broker that is not running should succeed - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "230000"), Optional.of("2")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), singletonMap("message.max.bytes", "230000")); + } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception { + alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers()); + + try (Admin client = cluster.createAdminClient()) { + // Add config + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.bytes", "110000")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.bytes", "120000")); + + // Change config + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.bytes", "130000")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.bytes", "140000")); + + // Delete config + deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), singleton("message.max.bytes")); + + // Listener configs: should work only with listener name + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")); + alterConfigWithKraft(client, Optional.empty(), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")); + deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), + singleton("listener.name.internal.ssl.keystore.location")); + alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("listener.name.external.ssl.keystore.password", "secret")); + + // Password config update with encoder secret should succeed and encoded password must be stored in ZK + Map<String, String> configs = new HashMap<>(); + configs.put("listener.name.external.ssl.keystore.password", "secret"); + configs.put("log.cleaner.threads", "2"); + // Password encoder configs + configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + + // Password config update at default cluster-level should fail + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), configs)); + } + } + + @ClusterTest(types = {Type.ZK}) + public void testAlterReadOnlyConfigInZookeeperThenShouldFail() { + cluster.shutdownBroker(0); + String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); + KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = generateDefaultAlterOpts(zkConnect); + + assertThrows(ConfigException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId), + singletonMap("auto.create.topics.enable", "false"))); + assertThrows(ConfigException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId), + singletonMap("auto.leader.rebalance.enable", "false"))); + assertThrows(ConfigException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId), + singletonMap("broker.id", "1"))); + } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testAlterReadOnlyConfigInKRaftThenShouldFail() { + alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers()); + + try (Admin client = cluster.createAdminClient()) { + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("auto.create.topics.enable", "false"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("auto.leader.rebalance.enable", "false"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("broker.id", "1"))); + } + } + + @ClusterTest(types = {Type.ZK}) + public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() { + cluster.shutdownBroker(0); + String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); + KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = generateDefaultAlterOpts(zkConnect); + + Map<String, String> configs = new HashMap<>(); + configs.put("log.flush.interval.messages", "100"); + configs.put("log.retention.bytes", "20"); + configs.put("log.retention.ms", "2"); + + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId), configs); + } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() throws Exception { + alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers()); + + try (Admin client = cluster.createAdminClient()) { + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("log.flush.interval.messages", "100")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("log.retention.bytes", "20")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("log.retention.ms", "2")); + } + } + + @ClusterTest(types = {Type.ZK}) + public void testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() { + cluster.shutdownBroker(0); + String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); + KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = generateDefaultAlterOpts(zkConnect); + + String listenerName = "listener.name.internal."; + String sslTruststoreType = listenerName + "ssl.truststore.type"; + String sslTruststoreLocation = listenerName + "ssl.truststore.location"; + String sslTruststorePassword = listenerName + "ssl.truststore.password"; + + Map<String, String> configs = new HashMap<>(); + configs.put(sslTruststoreType, "PKCS12"); + configs.put(sslTruststoreLocation, "/temp/test.jks"); + configs.put("password.encoder.secret", "encoder-secret"); + configs.put(sslTruststorePassword, "password"); + + alterConfigWithZk(zkClient, adminZkClient, Optional.of(defaultBrokerId), configs); + + Properties properties = zkClient.getEntityConfigs("brokers", defaultBrokerId); + assertTrue(properties.containsKey(sslTruststorePassword)); + assertEquals(configs.get(sslTruststoreType), properties.getProperty(sslTruststoreType)); + assertEquals(configs.get(sslTruststoreLocation), properties.getProperty(sslTruststoreLocation)); + } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws Exception { + alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers()); + String listenerName = "listener.name.internal."; + + try (Admin client = cluster.createAdminClient()) { + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap(listenerName + "ssl.truststore.type", "PKCS12")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap(listenerName + "ssl.truststore.location", "/temp/test.jks")); + + alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap(listenerName + "ssl.truststore.password", "password")); + verifyConfigDefaultValue(client, Optional.of(defaultBrokerId), + singleton(listenerName + "ssl.truststore.password")); + } + } + + @ClusterTest(types = {Type.ZK}) + public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() { + cluster.shutdownBroker(0); + String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); + KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = generateDefaultAlterOpts(zkConnect); + + assertThrows(ConfigException.class, () -> + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.type", "PKCS12"))); + assertThrows(ConfigException.class, () -> + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.location", "/temp/test.jks"))); + assertThrows(ConfigException.class, () -> + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.password", "password"))); + } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testUpdatePerBrokerConfigInKRaftThenShouldFail() { + alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers()); + + try (Admin client = cluster.createAdminClient()) { + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.type", "PKCS12"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.location", "/temp/test.jks"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.password", "password"))); + } + } + + private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) { + AtomicReference<Integer> exitStatus = new AtomicReference<>(); + Exit.setExitProcedure((status, __) -> { + exitStatus.set(status); + throw new RuntimeException(); + }); + + String errOut = captureStandardMsg(run(args)); + + checkErrOut.accept(errOut); + assertNotNull(exitStatus.get()); + assertEquals(1, exitStatus.get()); + } + + private Stream<String> quorumArgs() { + return cluster.isKRaftTest() + ? Stream.of("--bootstrap-server", cluster.bootstrapServers()) + : Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); + } + + private void verifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String> config) { + Properties entityConfigs = zkClient.getEntityConfigs("brokers", + brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); + assertEquals(config, entityConfigs); + } + + private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient adminZkClient, + Optional<String> brokerId, Map<String, String> configs) { + alterConfigWithZk(zkClient, adminZkClient, brokerId, configs); + verifyConfig(zkClient, brokerId, configs); + } + + private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient adminZkClient, + Optional<String> brokerId, Map<String, String> config) { + String configStr = transferConfigMapToString(config); + ConfigCommand.ConfigCommandOptions addOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr))); + ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); + } + + private List<String> entityOp(Optional<String> brokerId) { + return brokerId.map(id -> asList("--entity-name", id)) + .orElse(singletonList("--entity-default")); + } + + private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient adminZkClient, + Optional<String> brokerId, Set<String> configNames) { + ConfigCommand.ConfigCommandOptions deleteOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), + asList("--delete-config", String.join(",", configNames)))); + ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient); + verifyConfig(zkClient, brokerId, Collections.emptyMap()); + } + + private Map<String, String> generateEncodeConfig() { + Map<String, String> map = new HashMap<>(); + map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding"); + map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024"); + map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1"); + map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64"); + map.put("listener.name.external.ssl.keystore.password", "secret2"); + return map; } private void registerBrokerInZk(KafkaZkClient zkClient, int id) { zkClient.createTopLevelPaths(); SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT; - EndPoint endpoint = new EndPoint("localhost", 9092, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol); - BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, seq(endpoint), scala.None$.empty()), MetadataVersion.latestTesting(), 9192); + EndPoint endpoint = new EndPoint("localhost", 9092, + ListenerName.forSecurityProtocol(securityProtocol), securityProtocol); + BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint, + scala.None$.empty()), MetadataVersion.latestTesting(), 9192); zkClient.registerBroker(brokerInfo); } - @SafeVarargs - static <T> Seq<T> seq(T...seq) { - return seq(Arrays.asList(seq)); + private List<String> generateDefaultAlterOpts(String bootstrapServers) { + return asList("--bootstrap-server", bootstrapServers, + "--entity-type", "brokers", + "--entity-name", "0", "--alter"); + } + + private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, Map<String, String> config) throws Exception { + alterConfigWithKraft(client, brokerId, config); + verifyConfig(client, brokerId, config); + } + + private void alterConfigWithKraft(Admin client, Optional<String> brokerId, Map<String, String> config) { + String configStr = transferConfigMapToString(config); + ConfigCommand.ConfigCommandOptions addOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr))); + ConfigCommand.alterConfig(client, addOpts); + } + + private void verifyConfig(Admin client, Optional<String> brokerId, Map<String, String> config) throws Exception { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId)); + TestUtils.waitForCondition(() -> { + Map<String, String> current = client.describeConfigs(singletonList(configResource)) + .all() + .get() + .values() + .stream() + .flatMap(e -> e.entries().stream()) + .collect(HashMap::new, (map, entry) -> map.put(entry.name(), entry.value()), HashMap::putAll); + return config.entrySet().stream().allMatch(e -> e.getValue().equals(current.get(e.getKey()))); + }, 10000, config + " are not updated"); + } + + private void deleteAndVerifyConfig(Admin client, Optional<String> brokerId, Set<String> config) throws Exception { + ConfigCommand.ConfigCommandOptions deleteOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), + asList("--delete-config", String.join(",", config)))); + ConfigCommand.alterConfig(client, deleteOpts); + verifyConfigDefaultValue(client, brokerId, config); } - @SuppressWarnings({"deprecation"}) - static <T> Seq<T> seq(Collection<T> seq) { - return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); + private void verifyConfigDefaultValue(Admin client, Optional<String> brokerId, Set<String> config) throws Exception { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId)); + TestUtils.waitForCondition(() -> { + Map<String, String> current = client.describeConfigs(singletonList(configResource)) + .all() + .get() + .values() + .stream() + .flatMap(e -> e.entries().stream()) + .collect(HashMap::new, (map, entry) -> map.put(entry.name(), entry.value()), HashMap::putAll); + return config.stream().allMatch(current::containsKey); + }, 5000, config + " are not updated"); } @SafeVarargs - public static String[] toArray(List<String>... lists) { + private static String[] toArray(List<String>... lists) { return Stream.of(lists).flatMap(List::stream).toArray(String[]::new); } - public static String captureStandardErr(Runnable runnable) { - return captureStandardStream(true, runnable); + private String captureStandardMsg(Runnable runnable) { + return captureStandardStream(runnable); + } + + private String transferConfigMapToString(Map<String, String> configs) { + return configs.entrySet() + .stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(",")); } - private static String captureStandardStream(boolean isErr, Runnable runnable) { + private String captureStandardStream(Runnable runnable) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - PrintStream currentStream = isErr ? System.err : System.out; - PrintStream tempStream = new PrintStream(outputStream); - if (isErr) + PrintStream currentStream = System.err; + try (PrintStream tempStream = new PrintStream(outputStream)) { System.setErr(tempStream); - else - System.setOut(tempStream); - try { - runnable.run(); - return outputStream.toString().trim(); - } finally { - if (isErr) + try { + runnable.run(); + return outputStream.toString().trim(); + } finally { System.setErr(currentStream); - else - System.setOut(currentStream); - - tempStream.close(); + } } } } diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java b/core/src/test/java/kafka/admin/ConfigCommandTest.java index afb22a06c24..5968f3706a1 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java @@ -421,8 +421,8 @@ public class ConfigCommandTest { public void testExpectedEntityTypeNames(List<String> expectedTypes, List<String> expectedNames, List<String> connectOpts, String...args) { ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), connectOpts.get(1), "--describe"), Arrays.asList(args))); createOpts.checkArgs(); - assertEquals(createOpts.entityTypes().toSeq(), ConfigCommandIntegrationTest.seq(expectedTypes)); - assertEquals(createOpts.entityNames().toSeq(), ConfigCommandIntegrationTest.seq(expectedNames)); + assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes)); + assertEquals(createOpts.entityNames().toSeq(), seq(expectedNames)); } public void doTestOptionEntityTypeNames(boolean zkConfig) { @@ -1710,7 +1710,7 @@ public class ConfigCommandTest { public void checkEntities(List<String> opts, Map<String, List<String>> expectedFetches, List<String> expectedEntityNames) { ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(new ConfigCommand.ConfigCommandOptions(toArray(opts, Collections.singletonList("--describe")))); expectedFetches.forEach((name, values) -> - when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(ConfigCommandIntegrationTest.seq(values))); + when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(seq(values))); Seq<ConfigCommand.ConfigEntity> entities0 = entity.getAllEntities(zkClient); List<ConfigCommand.ConfigEntity> entities = new ArrayList<>(); entities0.foreach(e -> { @@ -1996,4 +1996,9 @@ public class ConfigCommandTest { return mock(AlterClientQuotasResult.class); } } + + @SuppressWarnings({"deprecation"}) + private <T> Seq<T> seq(Collection<T> seq) { + return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); + } }