This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21caf6b123d KAFKA-16629 Add broker-related tests to 
ConfigCommandIntegrationTest (#15840)
21caf6b123d is described below

commit 21caf6b123dd70a68d258fd925785a529f3a48d9
Author: Ken Huang <100591800+m1a...@users.noreply.github.com>
AuthorDate: Fri May 31 21:24:33 2024 +0900

    KAFKA-16629 Add broker-related tests to ConfigCommandIntegrationTest 
(#15840)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 checkstyle/import-control-core.xml                 |   1 +
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  27 +-
 core/src/main/scala/kafka/cluster/Broker.scala     |   4 +
 .../kafka/admin/ConfigCommandIntegrationTest.java  | 539 ++++++++++++++++-----
 .../test/java/kafka/admin/ConfigCommandTest.java   |  11 +-
 5 files changed, 438 insertions(+), 144 deletions(-)

diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index ed6c53a322b..6724ea9bf3b 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -130,5 +130,6 @@
     <allow pkg="kafka.test"/>
     <allow pkg="kafka.test.annotation"/>
     <allow pkg="kafka.test.junit"/>
+    <allow pkg="org.apache.kafka.clients.admin" />
   </subpackage>
 </import-control>
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 7421aed637d..04dce7a0255 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -19,7 +19,7 @@ package kafka.admin
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import joptsimple._
 import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
 import kafka.utils.Implicits._
@@ -210,15 +210,19 @@ object ConfigCommand extends Logging {
     }
   }
 
-  def createPasswordEncoder(encoderConfigs: Map[String, String]): 
PasswordEncoder = {
-    encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
-    val encoderSecret = 
encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
-      throw new IllegalArgumentException("Password encoder secret not 
specified"))
+  def createPasswordEncoder(encoderConfigs: java.util.Map[String, String]): 
PasswordEncoder = {
+    val encoderSecret = 
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
+      .orElseThrow(() => new IllegalArgumentException("Password encoder secret 
not specified"))
     PasswordEncoder.encrypting(new Password(encoderSecret),
       null,
-      
encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
-      
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
-      
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT))
+      
encoderConfigs.getOrDefault(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
+      
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG))
+        .map[Int](Integer.parseInt)
+        .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
+      
Optional.ofNullable(encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG))
+        .map[Int](Integer.parseInt)
+        .orElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT)
+    )
   }
 
   /**
@@ -244,8 +248,11 @@ object ConfigCommand extends Logging {
           " to override the default encoding parameters. Password encoder 
configs will not be persisted" +
           " in ZooKeeper."
       )
-
-      val passwordEncoder = 
createPasswordEncoder(passwordEncoderConfigs.asScala)
+      val passwordConfigsMap = new java.util.HashMap[String, String]
+      passwordEncoderConfigs.forEach { (key, value) =>
+        passwordConfigsMap.put(key.toString, value.toString)
+      }
+      val passwordEncoder = createPasswordEncoder(passwordConfigsMap)
       passwordConfigs.foreach { configName =>
         val encodedValue = passwordEncoder.encode(new 
Password(configsToBeAdded.getProperty(configName)))
         configsToBeAdded.setProperty(configName, encodedValue)
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala 
b/core/src/main/scala/kafka/cluster/Broker.scala
index ede63cd3c0a..e5835201fa3 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -43,6 +43,10 @@ object Broker {
     new Broker(id, endPoints, rack, emptySupportedFeatures)
   }
 
+  def apply(id: Int, endPoint: EndPoint, rack: Option[String]): Broker = {
+    new Broker(id, Seq(endPoint), rack, emptySupportedFeatures)
+  }
+
   private def supportedFeatures(features: java.util.Map[String, 
VersionRange]): java.util
   .Map[String, SupportedVersionRange] = {
     features.asScala.map { case (name, range) =>
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index ab0f30f49b6..23c250a764e 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -26,23 +26,22 @@ import kafka.test.junit.ZkClusterInvocationContext;
 import kafka.zk.AdminZkClient;
 import kafka.zk.BrokerInfo;
 import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.security.PasswordEncoder;
-import org.apache.kafka.security.PasswordEncoderConfigs;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.junit.platform.commons.util.StringUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -50,234 +49,512 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
+import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
+import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
+import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG;
+import static 
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
 @ExtendWith(value = ClusterTestExtensions.class)
 @Tag("integration")
 public class ConfigCommandIntegrationTest {
-    AdminZkClient adminZkClient;
-    List<String> alterOpts;
 
+    private List<String> alterOpts;
+    private final String defaultBrokerId = "0";
     private final ClusterInstance cluster;
 
+    private static Runnable run(Stream<String> command) {
+        return () -> {
+            try {
+                ConfigCommand.main(command.toArray(String[]::new));
+            } catch (RuntimeException e) {
+                // do nothing.
+            } finally {
+                Exit.resetExitProcedure();
+            }
+        };
+    }
+
     public ConfigCommandIntegrationTest(ClusterInstance cluster) {
         this.cluster = cluster;
     }
 
-    @ClusterTest(types = {Type.ZK, Type.KRAFT})
+    @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
     public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
         assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
             "--entity-name", cluster.isKRaftTest() ? "0" : "1",
             "--entity-type", "brokers",
             "--alter",
             "--add-config", "security.inter.broker.protocol=PLAINTEXT")),
-            errOut ->
-                assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
+            errOut -> assertTrue(errOut.contains("Cannot update these configs 
dynamically: Set(security.inter.broker.protocol)"), errOut));
     }
 
-
     @ClusterTest(types = {Type.ZK})
     public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
         assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
             "--entity-type", "users",
             "--entity-name", "admin",
             "--alter", "--add-config", "consumer_byte_rate=20000")),
-            errOut ->
-                assertTrue(errOut.contains("User configuration updates using 
ZooKeeper are only supported for SCRAM credential updates."), errOut));
+            errOut -> assertTrue(errOut.contains("User configuration updates 
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
     }
 
-    public static void assertNonZeroStatusExit(Stream<String> args, 
Consumer<String> checkErrOut) {
-        AtomicReference<Integer> exitStatus = new AtomicReference<>();
-        Exit.setExitProcedure((status, __) -> {
-            exitStatus.set(status);
-            throw new RuntimeException();
-        });
-
-        String errOut = captureStandardErr(() -> {
-            try {
-                ConfigCommand.main(args.toArray(String[]::new));
-            } catch (RuntimeException e) {
-                // do nothing.
-            } finally {
-                Exit.resetExitProcedure();
-            }
-        });
-
-        checkErrOut.accept(errOut);
-        assertNotNull(exitStatus.get());
-        assertEquals(1, exitStatus.get());
-    }
-
-    private Stream<String> quorumArgs() {
-        return cluster.isKRaftTest()
-            ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
-            : Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
-    }
-
-    public List<String> entityOp(Optional<String> brokerId) {
-        return brokerId.map(id -> Arrays.asList("--entity-name", 
id)).orElse(Collections.singletonList("--entity-default"));
-    }
-
-    public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> 
configs, Optional<String> brokerId) throws Exception {
-        alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap());
-    }
-
-    public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> 
configs, Optional<String> brokerId, Map<String, String> encoderConfigs) {
-        String configStr = Stream.of(configs.entrySet(), 
encoderConfigs.entrySet())
-            .flatMap(Set::stream)
-            .map(e -> e.getKey() + "=" + e.getValue())
-            .collect(Collectors.joining(","));
-        ConfigCommand.ConfigCommandOptions addOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), 
Arrays.asList("--add-config", configStr)));
-        ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
-    }
-
-    void verifyConfig(KafkaZkClient zkClient, Map<String, String> configs, 
Optional<String> brokerId) {
-        Properties entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
-        assertEquals(configs, entityConfigs);
-    }
-
-    void alterAndVerifyConfig(KafkaZkClient zkClient, Map<String, String> 
configs, Optional<String> brokerId) throws Exception {
-        alterConfigWithZk(zkClient, configs, brokerId);
-        verifyConfig(zkClient, configs, brokerId);
-    }
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void testNullStatusOnKraftCommandAlterUserQuota() {
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "users",
+            "--entity-name", "admin",
+            "--alter", "--add-config", "consumer_byte_rate=20000"));
+        String message = captureStandardMsg(run(command));
 
-    void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String> 
configNames, Optional<String> brokerId) {
-        ConfigCommand.ConfigCommandOptions deleteOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), 
Arrays.asList("--delete-config", String.join(",", configNames))));
-        ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
-        verifyConfig(zkClient, Collections.emptyMap(), brokerId);
+        assertTrue(StringUtils.isBlank(message), message);
     }
 
-    @ClusterTest(types = {Type.ZK})
+    @ClusterTest(types = Type.ZK)
     public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception 
{
         cluster.shutdownBroker(0);
         String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
         KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
 
         String brokerId = "1";
-        adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-        alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+        alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
         // Add config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "120000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singletonMap("message.max.bytes", "110000"));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+                singletonMap("message.max.bytes", "120000"));
 
         // Change config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "140000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singletonMap("message.max.bytes", "130000"));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+                singletonMap("message.max.bytes", "140000"));
 
         // Delete config
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singleton("message.max.bytes"));
+        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+                singleton("message.max.bytes"));
 
         // Listener configs: should work only with listener name
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singletonMap("listener.name.internal.ssl.keystore.location", 
"/tmp/test.jks"));
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
+                        singletonMap("ssl.keystore.location", 
"/tmp/test.jks")));
 
         // Per-broker config configured at default cluster-level should fail
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(),
+                        
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")));
+        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singleton("listener.name.internal.ssl.keystore.location"));
 
         // Password config update without encoder secret should fail
         assertThrows(IllegalArgumentException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
+                        
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
         // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
         Map<String, String> configs = new HashMap<>();
         configs.put("listener.name.external.ssl.keystore.password", "secret");
         configs.put("log.cleaner.threads", "2");
-        Map<String, String> encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+        Map<String, String> encoderConfigs = new HashMap<>(configs);
+        encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+        alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), 
encoderConfigs);
         Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-        
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
 "Encoder secret stored in ZooKeeper");
+        assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
         assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
         String encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
-        PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
+        PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(encoderConfigs);
         assertEquals("secret", 
passwordEncoder.decode(encodedPassword).value());
         assertEquals(configs.size(), brokerConfigs.size());
 
         // Password config update with overrides for encoder parameters
-        Map<String, String> configs2 = 
Collections.singletonMap("listener.name.internal.ssl.keystore.password", 
"secret2");
-        Map<String, String> encoderConfigs2 = new HashMap<>();
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 "DES/CBC/PKCS5Padding");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, 
"1024");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
 "PBKDF2WithHmacSHA1");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, 
"64");
-        alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), 
encoderConfigs2);
+        Map<String, String> encoderConfigs2 = generateEncodeConfig();
+        alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), 
encoderConfigs2);
         Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", 
brokerId);
-        String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
-        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
-        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
+        String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.external.ssl.keystore.password");
+        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs)
+                .decode(encodedPassword2).value());
+        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(encoderConfigs2)
+                .decode(encodedPassword2).value());
 
         // Password config update at default cluster-level should fail
-        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
configs, Optional.empty(), encoderConfigs));
+        assertThrows(ConfigException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(), encoderConfigs));
 
         // Dynamic config updates using ZK should fail if broker is running.
         registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"210000"), Optional.of(brokerId)));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"220000"), Optional.empty()));
+        assertThrows(IllegalArgumentException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient,
+                        Optional.of(brokerId), 
singletonMap("message.max.bytes", "210000")));
+        assertThrows(IllegalArgumentException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient,
+                        Optional.empty(), singletonMap("message.max.bytes", 
"220000")));
 
         // Dynamic config updates using ZK should for a different broker that 
is not running should succeed
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), 
singletonMap("message.max.bytes", "230000"));
+    }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
+        alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+        try (Admin client = cluster.createAdminClient()) {
+            // Add config
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("message.max.bytes", "110000"));
+            alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.bytes", "120000"));
+
+            // Change config
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("message.max.bytes", "130000"));
+            alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.bytes", "140000"));
+
+            // Delete config
+            deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singleton("message.max.bytes"));
+
+            // Listener configs: should work only with listener name
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
+            alterConfigWithKraft(client, Optional.empty(),
+                    
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
+            deleteAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singleton("listener.name.internal.ssl.keystore.location"));
+            alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+                    
singletonMap("listener.name.external.ssl.keystore.password", "secret"));
+
+            // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
+            Map<String, String> configs = new HashMap<>();
+            configs.put("listener.name.external.ssl.keystore.password", 
"secret");
+            configs.put("log.cleaner.threads", "2");
+            // Password encoder configs
+            configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+
+            // Password config update at default cluster-level should fail
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId), configs));
+        }
+    }
+
+    @ClusterTest(types = {Type.ZK})
+    public void testAlterReadOnlyConfigInZookeeperThenShouldFail() {
+        cluster.shutdownBroker(0);
+        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
+        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
+        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+        alterOpts = generateDefaultAlterOpts(zkConnect);
+
+        assertThrows(ConfigException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
+                        singletonMap("auto.create.topics.enable", "false")));
+        assertThrows(ConfigException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
+                        singletonMap("auto.leader.rebalance.enable", 
"false")));
+        assertThrows(ConfigException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
+                        singletonMap("broker.id", "1")));
+    }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
+        alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+        try (Admin client = cluster.createAdminClient()) {
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("auto.create.topics.enable", 
"false")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("auto.leader.rebalance.enable", 
"false")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("broker.id", "1")));
+        }
+    }
+
+    @ClusterTest(types = {Type.ZK})
+    public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() {
+        cluster.shutdownBroker(0);
+        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
+        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
+        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+        alterOpts = generateDefaultAlterOpts(zkConnect);
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("log.flush.interval.messages", "100");
+        configs.put("log.retention.bytes", "20");
+        configs.put("log.retention.ms", "2");
+
+        alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId), configs);
+    }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() 
throws Exception {
+        alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+        try (Admin client = cluster.createAdminClient()) {
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap("log.flush.interval.messages", "100"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap("log.retention.bytes", "20"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap("log.retention.ms", "2"));
+        }
+    }
+
+    @ClusterTest(types = {Type.ZK})
+    public void 
testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() {
+        cluster.shutdownBroker(0);
+        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
+        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
+        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+        alterOpts = generateDefaultAlterOpts(zkConnect);
+
+        String listenerName = "listener.name.internal.";
+        String sslTruststoreType = listenerName + "ssl.truststore.type";
+        String sslTruststoreLocation = listenerName + 
"ssl.truststore.location";
+        String sslTruststorePassword = listenerName + 
"ssl.truststore.password";
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put(sslTruststoreType, "PKCS12");
+        configs.put(sslTruststoreLocation, "/temp/test.jks");
+        configs.put("password.encoder.secret", "encoder-secret");
+        configs.put(sslTruststorePassword, "password");
+
+        alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(defaultBrokerId), configs);
+
+        Properties properties = zkClient.getEntityConfigs("brokers", 
defaultBrokerId);
+        assertTrue(properties.containsKey(sslTruststorePassword));
+        assertEquals(configs.get(sslTruststoreType), 
properties.getProperty(sslTruststoreType));
+        assertEquals(configs.get(sslTruststoreLocation), 
properties.getProperty(sslTruststoreLocation));
+    }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void 
testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws 
Exception {
+        alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+        String listenerName = "listener.name.internal.";
+
+        try (Admin client = cluster.createAdminClient()) {
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap(listenerName + "ssl.truststore.type", 
"PKCS12"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap(listenerName + "ssl.truststore.location", 
"/temp/test.jks"));
+
+            alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+                    singletonMap(listenerName + "ssl.truststore.password", 
"password"));
+            verifyConfigDefaultValue(client, Optional.of(defaultBrokerId),
+                    singleton(listenerName + "ssl.truststore.password"));
+        }
+    }
+
+    @ClusterTest(types = {Type.ZK})
+    public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() {
+        cluster.shutdownBroker(0);
+        String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect();
+        KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
+        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+        alterOpts = generateDefaultAlterOpts(zkConnect);
+
+        assertThrows(ConfigException.class, () ->
+                alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
+                        singletonMap("ssl.truststore.type", "PKCS12")));
+        assertThrows(ConfigException.class, () ->
+                alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
+                        singletonMap("ssl.truststore.location", 
"/temp/test.jks")));
+        assertThrows(ConfigException.class, () ->
+                alterAndVerifyConfig(zkClient, adminZkClient, 
Optional.of(defaultBrokerId),
+                        singletonMap("ssl.truststore.password", "password")));
+    }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+    public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
+        alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+        try (Admin client = cluster.createAdminClient()) {
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("ssl.truststore.type", "PKCS12")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("ssl.truststore.location", 
"/temp/test.jks")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("ssl.truststore.password", 
"password")));
+        }
+    }
+
+    private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> 
checkErrOut) {
+        AtomicReference<Integer> exitStatus = new AtomicReference<>();
+        Exit.setExitProcedure((status, __) -> {
+            exitStatus.set(status);
+            throw new RuntimeException();
+        });
+
+        String errOut = captureStandardMsg(run(args));
+
+        checkErrOut.accept(errOut);
+        assertNotNull(exitStatus.get());
+        assertEquals(1, exitStatus.get());
+    }
+
+    private Stream<String> quorumArgs() {
+        return cluster.isKRaftTest()
+                ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
+                : Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
+    }
+
+    private void verifyConfig(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String> config) {
+        Properties entityConfigs = zkClient.getEntityConfigs("brokers",
+                brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
+        assertEquals(config, entityConfigs);
+    }
+
+    private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient 
adminZkClient,
+                                      Optional<String> brokerId, Map<String, 
String> configs) {
+        alterConfigWithZk(zkClient, adminZkClient, brokerId, configs);
+        verifyConfig(zkClient, brokerId, configs);
+    }
+
+    private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient 
adminZkClient,
+                                   Optional<String> brokerId, Map<String, 
String> config) {
+        String configStr = transferConfigMapToString(config);
+        ConfigCommand.ConfigCommandOptions addOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId), asList("--add-config", configStr)));
+        ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
+    }
+
+    private List<String> entityOp(Optional<String> brokerId) {
+        return brokerId.map(id -> asList("--entity-name", id))
+                .orElse(singletonList("--entity-default"));
+    }
+
+    private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient 
adminZkClient,
+                                       Optional<String> brokerId, Set<String> 
configNames) {
+        ConfigCommand.ConfigCommandOptions deleteOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
+                        asList("--delete-config", String.join(",", 
configNames))));
+        ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
+        verifyConfig(zkClient, brokerId, Collections.emptyMap());
+    }
+
+    private Map<String, String> generateEncodeConfig() {
+        Map<String, String> map = new HashMap<>();
+        map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+        map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, 
"DES/CBC/PKCS5Padding");
+        map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
+        map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, 
"PBKDF2WithHmacSHA1");
+        map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
+        map.put("listener.name.external.ssl.keystore.password", "secret2");
+        return map;
     }
 
     private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
         zkClient.createTopLevelPaths();
         SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
-        EndPoint endpoint = new EndPoint("localhost", 9092, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
-        BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, 
seq(endpoint), scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
+        EndPoint endpoint = new EndPoint("localhost", 9092,
+                ListenerName.forSecurityProtocol(securityProtocol), 
securityProtocol);
+        BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint,
+                scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
         zkClient.registerBroker(brokerInfo);
     }
 
-    @SafeVarargs
-    static <T> Seq<T> seq(T...seq) {
-        return seq(Arrays.asList(seq));
+    private List<String> generateDefaultAlterOpts(String bootstrapServers) {
+        return asList("--bootstrap-server", bootstrapServers,
+                "--entity-type", "brokers",
+                "--entity-name", "0", "--alter");
+    }
+
+    private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
+        alterConfigWithKraft(client, brokerId, config);
+        verifyConfig(client, brokerId, config);
+    }
+
+    private void alterConfigWithKraft(Admin client, Optional<String> brokerId, 
Map<String, String> config) {
+        String configStr = transferConfigMapToString(config);
+        ConfigCommand.ConfigCommandOptions addOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId), asList("--add-config", configStr)));
+        ConfigCommand.alterConfig(client, addOpts);
+    }
+
+    private void verifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+        TestUtils.waitForCondition(() -> {
+            Map<String, String> current = 
client.describeConfigs(singletonList(configResource))
+                    .all()
+                    .get()
+                    .values()
+                    .stream()
+                    .flatMap(e -> e.entries().stream())
+                    .collect(HashMap::new, (map, entry) -> 
map.put(entry.name(), entry.value()), HashMap::putAll);
+            return config.entrySet().stream().allMatch(e -> 
e.getValue().equals(current.get(e.getKey())));
+        }, 10000, config + " are not updated");
+    }
+
+    private void deleteAndVerifyConfig(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
+        ConfigCommand.ConfigCommandOptions deleteOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
+                        asList("--delete-config", String.join(",", config))));
+        ConfigCommand.alterConfig(client, deleteOpts);
+        verifyConfigDefaultValue(client, brokerId, config);
     }
 
-    @SuppressWarnings({"deprecation"})
-    static <T> Seq<T> seq(Collection<T> seq) {
-        return 
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+    private void verifyConfigDefaultValue(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+        TestUtils.waitForCondition(() -> {
+            Map<String, String> current = 
client.describeConfigs(singletonList(configResource))
+                    .all()
+                    .get()
+                    .values()
+                    .stream()
+                    .flatMap(e -> e.entries().stream())
+                    .collect(HashMap::new, (map, entry) -> 
map.put(entry.name(), entry.value()), HashMap::putAll);
+            return config.stream().allMatch(current::containsKey);
+        }, 5000, config + " are not updated");
     }
 
     @SafeVarargs
-    public static String[] toArray(List<String>... lists) {
+    private static String[] toArray(List<String>... lists) {
         return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
     }
 
-    public static String captureStandardErr(Runnable runnable) {
-        return captureStandardStream(true, runnable);
+    private String captureStandardMsg(Runnable runnable) {
+        return captureStandardStream(runnable);
+    }
+
+    private String transferConfigMapToString(Map<String, String> configs) {
+        return configs.entrySet()
+                .stream()
+                .map(e -> e.getKey() + "=" + e.getValue())
+                .collect(Collectors.joining(","));
     }
 
-    private static String captureStandardStream(boolean isErr, Runnable 
runnable) {
+    private String captureStandardStream(Runnable runnable) {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        PrintStream currentStream = isErr ? System.err : System.out;
-        PrintStream tempStream = new PrintStream(outputStream);
-        if (isErr)
+        PrintStream currentStream = System.err;
+        try (PrintStream tempStream = new PrintStream(outputStream)) {
             System.setErr(tempStream);
-        else
-            System.setOut(tempStream);
-        try {
-            runnable.run();
-            return outputStream.toString().trim();
-        } finally {
-            if (isErr)
+            try {
+                runnable.run();
+                return outputStream.toString().trim();
+            } finally {
                 System.setErr(currentStream);
-            else
-                System.setOut(currentStream);
-
-            tempStream.close();
+            }
         }
     }
 }
diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandTest.java
index afb22a06c24..5968f3706a1 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java
@@ -421,8 +421,8 @@ public class ConfigCommandTest {
     public void testExpectedEntityTypeNames(List<String> expectedTypes, 
List<String> expectedNames, List<String> connectOpts, String...args) {
         ConfigCommand.ConfigCommandOptions createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), 
connectOpts.get(1), "--describe"), Arrays.asList(args)));
         createOpts.checkArgs();
-        assertEquals(createOpts.entityTypes().toSeq(), 
ConfigCommandIntegrationTest.seq(expectedTypes));
-        assertEquals(createOpts.entityNames().toSeq(), 
ConfigCommandIntegrationTest.seq(expectedNames));
+        assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes));
+        assertEquals(createOpts.entityNames().toSeq(), seq(expectedNames));
     }
 
     public void doTestOptionEntityTypeNames(boolean zkConfig) {
@@ -1710,7 +1710,7 @@ public class ConfigCommandTest {
     public void checkEntities(List<String> opts, Map<String, List<String>> 
expectedFetches, List<String> expectedEntityNames) {
         ConfigCommand.ConfigEntity entity = ConfigCommand.parseEntity(new 
ConfigCommand.ConfigCommandOptions(toArray(opts, 
Collections.singletonList("--describe"))));
         expectedFetches.forEach((name, values) ->
-            
when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(ConfigCommandIntegrationTest.seq(values)));
+            
when(zkClient.getAllEntitiesWithConfig(name)).thenReturn(seq(values)));
         Seq<ConfigCommand.ConfigEntity> entities0 = 
entity.getAllEntities(zkClient);
         List<ConfigCommand.ConfigEntity> entities = new ArrayList<>();
         entities0.foreach(e -> {
@@ -1996,4 +1996,9 @@ public class ConfigCommandTest {
             return mock(AlterClientQuotasResult.class);
         }
     }
+
+    @SuppressWarnings({"deprecation"})
+    private <T> Seq<T> seq(Collection<T> seq) {
+        return 
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+    }
 }


Reply via email to