chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1589865773


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2992,6 +2992,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     val preprocessingResponses = configManager.preprocess(original.data(),
       (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, 
rType, rName))
     val remaining = 
ConfigAdminManager.copyWithoutPreprocessed(original.data(), 
preprocessingResponses)
+    val isKRaftController = metadataSupport match {

Review Comment:
   Could you please add comments for this check? It seems to me race condition 
always needs good docs.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -995,16 +1027,47 @@ class ZkMigrationIntegrationTest {
     dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin, shouldRetry: Boolean = false): Unit = {

Review Comment:
   ditto. please remove the unused default value.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
     admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-    TestUtils.retry(10000) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = 
false): Unit = {

Review Comment:
   It seems the default value `= false` is unused, so could you please remove 
it?



##########
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##########
@@ -92,7 +92,7 @@ public List<Extension> getAdditionalExtensions() {
         if (clusterConfig.numControllers() != 1) {
             throw new IllegalArgumentException("For ZK clusters, please 
specify exactly 1 controller.");
         }
-        ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, 
clusterReference);
+        ZkClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, 
clusterReference);

Review Comment:
   Maybe we can revert this unrelated change?



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest {
     admin.alterUserScramCredentials(alterations)
   }
 
-  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
-    TestUtils.retry(10000) {
+  def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = 
false): Unit = {
+    maybeRetry(shouldRetry, 10000) {
       val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test")
       assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
       assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
     }
   }
 
-  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
-    TestUtils.retry(10000) {
-      assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, 
Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
-      assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, 
"<default>").getProperty("consumer_byte_rate"))
-      assertEquals("800", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("consumer_byte_rate"))
-      assertEquals("100", zkClient.getEntityConfigs("users/" + 
Sanitizer.sanitize("user@1") + "/clients", 
"clientA").getProperty("producer_byte_rate"))
-      assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, 
"8.8.8.8").getProperty("connection_creation_rate"))
+  def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = 
false): Unit = {

Review Comment:
   ditto. please remove the unused default value.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -995,16 +1027,47 @@ class ZkMigrationIntegrationTest {
     dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin, shouldRetry: Boolean = false): Unit = {
+    val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+    val defaultBrokerConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "86400000"), 
AlterConfigOp.OpType.SET),
+    ).asJavaCollection
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val specificBrokerConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "43200000"), 
AlterConfigOp.OpType.SET),
+    ).asJavaCollection
+
+    maybeRetry(shouldRetry, 60000) {
+      val result = admin.incrementalAlterConfigs(Map(
+        defaultBrokerResource -> defaultBrokerConfigs,
+        broker0Resource -> specificBrokerConfigs,
+        broker1Resource -> specificBrokerConfigs
+      ).asJava)
+      try {
+        result.all().get(10, TimeUnit.SECONDS)
+      } catch {
+        case t: Throwable => fail("Alter Broker Configs had an error", t)
+      }
+    }
+  }
+
+  def alterTopicConfig(admin: Admin, shouldRetry: Boolean = false): Unit = {

Review Comment:
   ditto. please remove the unused default value.



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -467,13 +474,42 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * @param rootEntityType entity type
    * @param sanitizedEntityName entity name
    * @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
    */
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+    val controllerZkVersion = if (!enableEntityConfigNoController) {

Review Comment:
   out of curiosity. Why we have a flag to control the checks even though there 
is a true issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to