chia7712 commented on code in PR #15744: URL: https://github.com/apache/kafka/pull/15744#discussion_r1580130359
########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest { dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get } - def alterTopicConfig(admin: Admin): AlterConfigsResult = { + def alterBrokerConfigs(admin: Admin): Unit = { + val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") + val defaultBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "86400000"), AlterConfigOp.OpType.SET), + ).asJavaCollection + val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") + val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1") + val specificBrokerConfigs = Seq( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "43200000"), AlterConfigOp.OpType.SET), + ).asJavaCollection + + TestUtils.retry(60000) { + val result = admin.incrementalAlterConfigs(Map( + defaultBrokerResource -> defaultBrokerConfigs, + broker0Resource -> specificBrokerConfigs, + broker1Resource -> specificBrokerConfigs + ).asJava) + try { + result.all().get(10, TimeUnit.SECONDS) + } catch { + case t: Throwable => fail("Alter Broker Configs had an error", t) + } + } Review Comment: I have the same question here. 1. In the test case `testIncrementalAlterConfigsPreMigration`, it should pass at once since KRaft quorum is not ready. 2. In the test `testDualWrite`, it should pass at once as we have wait for migration to begin. ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -974,7 +1035,11 @@ class ZkMigrationIntegrationTest { quotas.add(new ClientQuotaAlteration( new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava), List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava)) - admin.alterClientQuotas(quotas) + try { + admin.alterClientQuotas(quotas).all().get(10, TimeUnit.SECONDS) + } catch { + case t: Throwable => fail("Alter Client Quotas had an error", t) Review Comment: ditto. not sure why we need to catch it and then call `fail` ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest { // Alter the metadata log.info("Updating metadata with AdminClient") admin = zkCluster.createAdminClient() - alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS) - alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS) + alterTopicConfig(admin) + alterClientQuotas(admin) + alterBrokerConfigs(admin) // Verify the changes made to KRaft are seen in ZK log.info("Verifying metadata changes with ZK") verifyTopicConfigs(zkClient) verifyClientQuotas(zkClient) + verifyBrokerConfigs(zkClient) val nextKRaftProducerId = sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, TimeUnit.SECONDS) assertNotEquals(nextProducerId, nextKRaftProducerId) - + } catch { + case t: Throwable => fail("Uncaught error in test", t) Review Comment: why catching the error in this test case? The test case can get failed even though we don't catch it. Or you plan to make it be retryable by `TestUtils.retry`? ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array( + new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), + new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), + new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"), + new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"), + )) + def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): Unit = { + // Enable migration configs and restart brokers without KRaft quorum ready + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") + zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9999") Review Comment: `RaftConfig` is renamed to `QuorumConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org