chia7712 commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1580130359


##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -950,16 +980,47 @@ class ZkMigrationIntegrationTest {
     dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
   }
 
-  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+  def alterBrokerConfigs(admin: Admin): Unit = {
+    val defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, 
"")
+    val defaultBrokerConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "86400000"), 
AlterConfigOp.OpType.SET),
+    ).asJavaCollection
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    val broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val specificBrokerConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "43200000"), 
AlterConfigOp.OpType.SET),
+    ).asJavaCollection
+
+    TestUtils.retry(60000) {
+      val result = admin.incrementalAlterConfigs(Map(
+        defaultBrokerResource -> defaultBrokerConfigs,
+        broker0Resource -> specificBrokerConfigs,
+        broker1Resource -> specificBrokerConfigs
+      ).asJava)
+      try {
+        result.all().get(10, TimeUnit.SECONDS)
+      } catch {
+        case t: Throwable => fail("Alter Broker Configs had an error", t)
+      }
+    }

Review Comment:
   I have the same question here.
   1. In the test case `testIncrementalAlterConfigsPreMigration`, it should 
pass at once since KRaft quorum is not ready.
   2. In the test `testDualWrite`,  it should pass at once as we have wait for 
migration to begin.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -974,7 +1035,11 @@ class ZkMigrationIntegrationTest {
     quotas.add(new ClientQuotaAlteration(
       new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
       List(new ClientQuotaAlteration.Op("connection_creation_rate", 
10.0)).asJava))
-    admin.alterClientQuotas(quotas)
+    try {
+      admin.alterClientQuotas(quotas).all().get(10, TimeUnit.SECONDS)
+    } catch {
+      case t: Throwable => fail("Alter Client Quotas had an error", t)

Review Comment:
   ditto. not sure why we need to catch it and then call `fail`



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -620,16 +620,19 @@ class ZkMigrationIntegrationTest {
       // Alter the metadata
       log.info("Updating metadata with AdminClient")
       admin = zkCluster.createAdminClient()
-      alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
-      alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+      alterTopicConfig(admin)
+      alterClientQuotas(admin)
+      alterBrokerConfigs(admin)
 
       // Verify the changes made to KRaft are seen in ZK
       log.info("Verifying metadata changes with ZK")
       verifyTopicConfigs(zkClient)
       verifyClientQuotas(zkClient)
+      verifyBrokerConfigs(zkClient)
       val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
       assertNotEquals(nextProducerId, nextKRaftProducerId)
-
+    } catch {
+      case t: Throwable => fail("Uncaught error in test", t)

Review Comment:
   why catching the error in this test case? The test case can get failed even 
though we don't catch it. Or you plan to make it be retryable by 
`TestUtils.retry`?



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -852,6 +855,33 @@ class ZkMigrationIntegrationTest {
     }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testIncrementalAlterConfigsPreMigration(zkCluster: ClusterInstance): 
Unit = {
+    // Enable migration configs and restart brokers without KRaft quorum ready
+    
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+    zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
"1@localhost:9999")

Review Comment:
   `RaftConfig` is renamed to `QuorumConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to