chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1589865773
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2992,6 +2992,11 @@ class KafkaApis(val requestChannel: RequestChannel, val preprocessingResponses = configManager.preprocess(original.data(), (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS, rType, rName)) val remaining = ConfigAdminManager.copyWithoutPreprocessed(original.data(), preprocessingResponses) + val isKRaftController = metadataSupport match { Review Comment: Could you please add comments for this check? It seems to me race condition always needs good docs. ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -995,16 +1027,47 @@ class ZkMigrationIntegrationTest { dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get } - def alterTopicConfig(admin: Admin): AlterConfigsResult = { + def alterBrokerConfigs(admin: Admin, shouldRetry: Boolean = false): Unit = { Review Comment: ditto. please remove the unused default value. ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest { admin.alterUserScramCredentials(alterations) } - def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = { - TestUtils.retry(10000) { + def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = false): Unit = { Review Comment: It seems the default value `= false` is unused, so could you please remove it? ########## core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java: ########## @@ -92,7 +92,7 @@ public List<Extension> getAdditionalExtensions() { if (clusterConfig.numControllers() != 1) { throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller."); } - ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference); + ZkClusterInstance clusterShim = new ZkClusterInstance(clusterConfig, clusterReference); Review Comment: Maybe we can revert this unrelated change? ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -1037,24 +1104,35 @@ class ZkMigrationIntegrationTest { admin.alterUserScramCredentials(alterations) } - def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = { - TestUtils.retry(10000) { + def verifyTopicConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = false): Unit = { + maybeRetry(shouldRetry, 10000) { val propsAfter = zkClient.getEntityConfigs(ConfigType.TOPIC, "test") assertEquals("204800", propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG)) assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG)) } } - def verifyClientQuotas(zkClient: KafkaZkClient): Unit = { - TestUtils.retry(10000) { - assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate")) - assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "<default>").getProperty("consumer_byte_rate")) - assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate")) - assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate")) - assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate")) + def verifyBrokerConfigs(zkClient: KafkaZkClient, shouldRetry: Boolean = false): Unit = { Review Comment: ditto. please remove the unused default value. ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -995,16 +1027,47 @@ class ZkMigrationIntegrationTest { dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get } - def alterTopicConfig(admin: Admin): AlterConfigsResult = { + def alterBrokerConfigs(admin: Admin, shouldRetry: Boolean = false): Unit = { + val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") + val defaultBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "86400000"), AlterConfigOp.OpType.SET), + ).asJavaCollection + val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") + val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1") + val specificBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "43200000"), AlterConfigOp.OpType.SET), + ).asJavaCollection + + maybeRetry(shouldRetry, 60000) { + val result = admin.incrementalAlterConfigs(Map( + defaultBrokerResource -> defaultBrokerConfigs, + broker0Resource -> specificBrokerConfigs, + broker1Resource -> specificBrokerConfigs + ).asJava) + try { + result.all().get(10, TimeUnit.SECONDS) + } catch { + case t: Throwable => fail("Alter Broker Configs had an error", t) + } + } + } + + def alterTopicConfig(admin: Admin, shouldRetry: Boolean = false): Unit = { Review Comment: ditto. please remove the unused default value. ########## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ########## @@ -467,13 +474,42 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode + * @throws ControllerMovedException if no controller is defined, or a KRaft controller is defined */ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { + val controllerZkVersion = if (!enableEntityConfigNoController) { Review Comment: out of curiosity. Why we have a flag to control the checks even though there is a true issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org