Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1916691435 Thank you @jlprat for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
jlprat commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1916625862 Fix is here: https://github.com/apache/kafka/pull/15290 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
jlprat commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1916610169 Hi @tinaselenge and @dengziming I think this change broke the build. Right now Kafka can't be compiled with Scala 2.12, see the error here: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14846/15/pipeline/9 > [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-14846/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:490:62: the result type of an implicit conversion must be more specific than Object -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming merged PR #14846: URL: https://github.com/apache/kafka/pull/14846 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1893497699 @dengziming thank you again for reviewing the code. I addressed your comments, please let me know if I missed anything or have any further comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1453250983 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness { server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0) // delete topic -adminZkClient.deleteTopic("test") -TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) +admin.deleteTopics(Collections.singletonList(topic)).all().get() +TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers) } - @Test - def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic -servers = createTestTopicAndCluster(topic) +brokers = createTestTopicAndCluster(topic) // start topic deletion -adminZkClient.deleteTopic(topic) +admin.deleteTopics(Collections.singletonList(topic)).all().get() // try to delete topic marked as deleted -assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => adminZkClient.deleteTopic(topic)) +// start topic deletion +TestUtils.waitUntilTrue(() => { + try { +admin.deleteTopics(Collections.singletonList(topic)).all().get() +false + } catch { +case e: ExecutionException => + classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass) + } +}, s"Topic ${topic} should be marked for deletion or already deleted.") -TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) +TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) } - private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = { -val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown = false) + private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaBroker] = { +val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, zkConnectOrNull, enableControlledShutdown = false) brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString)) createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment) } - private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = { + private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaBroker] = { val topicPartition = new TopicPartition(topic, 0) // create brokers -val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) +val brokers = brokerConfigs.map(b => createBroker(KafkaConfig.fromProps(b))) + +admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) Review Comment: Not sure if this is necessary, as we only create the admin here when a new set of brokers are created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness { */ val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)) -val topic = "test" -servers = createTestTopicAndCluster(topic, true, replicaAssignment) +val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull, enableControlledShutdown = false) Review Comment: Why not use `createTestTopicAndCluster` directly? ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -19,78 +19,95 @@ package kafka.admin import java.util import java.util.concurrent.ExecutionException import java.util.{Collections, Optional, Properties} - import scala.collection.Seq import kafka.log.UnifiedLog import kafka.zk.TopicPartitionZNode -import kafka.utils.TestUtils -import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness} +import kafka.utils._ +import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} -import kafka.common.TopicAlreadyMarkedForDeletionException +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment, NewPartitions} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.errors.{TopicDeletionDisabledException, UnknownTopicOrPartitionException} +import org.apache.kafka.metadata.BrokerState + import scala.jdk.CollectionConverters._ class DeleteTopicTest extends QuorumTestHarness { + var brokers: Seq[KafkaBroker] = Seq() + var servers: Seq[KafkaServer] = Seq() Review Comment: I thinks it's unnecessary to keep both servers and brokers, we can use `KafkaBroker` in most case, or cast it to `KafkaServer` if necessary ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness { server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0) // delete topic -adminZkClient.deleteTopic("test") -TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) +admin.deleteTopics(Collections.singletonList(topic)).all().get() +TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers) } - @Test - def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic -servers = createTestTopicAndCluster(topic) +brokers = createTestTopicAndCluster(topic) // start topic deletion -adminZkClient.deleteTopic(topic) +admin.deleteTopics(Collections.singletonList(topic)).all().get() // try to delete topic marked as deleted -assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => adminZkClient.deleteTopic(topic)) +// start topic deletion +TestUtils.waitUntilTrue(() => { + try { +admin.deleteTopics(Collections.singletonList(topic)).all().get() +false + } catch { +case e: ExecutionException => + classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass) + } +}, s"Topic ${topic} should be marked for deletion or already deleted.") -TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) +TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) } - private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = { -val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown = false) + private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaBroker] = { +val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, zkConnectOrNull, enableControlledShutdown = false) brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString)) createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment) } - private def
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1441637951 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness { }.toSet } - @Test - def testIncreasePartitionCountDuringDeleteTopic(): Unit = { -val topic = "test" + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = { val topicPartition = new TopicPartition(topic, 0) -val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) -brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) -// create brokers -val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) -this.servers = allServers -val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) -// create the topic -TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) -// wait until replica log is created on every broker -TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), - "Replicas for topic test not created.") -// shutdown a broker to make sure the following topic deletion will be suspended -val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) -assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") -val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last -follower.shutdown() -// start topic deletion -adminZkClient.deleteTopic(topic) -// make sure deletion of all of the topic's replicas have been tried -ensureControllerExists() -val (controller, controllerId) = getController() -val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) -TestUtils.waitUntilTrue(() => { - val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) - val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) - allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) -}, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") +if (isKRaftTest()) { + val topicPartition = new TopicPartition(topic, 0) + val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled = true) + this.brokers = allBrokers + val partitionHostingBrokers = allBrokers.filter(b => expectedReplicaAssignment(0).contains(b.config.brokerId)) + + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined), +"Replicas for topic test not created.") + + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition) + val follower = partitionHostingBrokers.filter(s => s.config.brokerId != leaderIdOpt).last + follower.shutdown() + // start topic deletion + admin.deleteTopics(Collections.singletonList(topic)).all().get() + + // increase the partition count for topic + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(partitionHostingBrokers)) + TestUtils.resource(Admin.create(props)) { adminClient => +try { + adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() +} catch { + case _: ExecutionException => +} + } -// increase the partition count for topic -val props = new Properties() -props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers)) -TestUtils.resource(Admin.create(props)) { adminClient => - try { -adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() - } catch { -case _: ExecutionException => + // bring back the failed broker + follower.startup() + TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers) +} else { + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers + val partitionHostingServers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1440196006 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness { }.toSet } - @Test - def testIncreasePartitionCountDuringDeleteTopic(): Unit = { -val topic = "test" + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = { val topicPartition = new TopicPartition(topic, 0) -val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) -brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) -// create brokers -val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) -this.servers = allServers -val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) -// create the topic -TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) -// wait until replica log is created on every broker -TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), - "Replicas for topic test not created.") -// shutdown a broker to make sure the following topic deletion will be suspended -val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) -assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") -val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last -follower.shutdown() -// start topic deletion -adminZkClient.deleteTopic(topic) -// make sure deletion of all of the topic's replicas have been tried -ensureControllerExists() -val (controller, controllerId) = getController() -val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) -TestUtils.waitUntilTrue(() => { - val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) - val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) - allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) -}, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") +if (isKRaftTest()) { + val topicPartition = new TopicPartition(topic, 0) + val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled = true) + this.brokers = allBrokers + val partitionHostingBrokers = allBrokers.filter(b => expectedReplicaAssignment(0).contains(b.config.brokerId)) + + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined), +"Replicas for topic test not created.") + + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition) + val follower = partitionHostingBrokers.filter(s => s.config.brokerId != leaderIdOpt).last + follower.shutdown() + // start topic deletion + admin.deleteTopics(Collections.singletonList(topic)).all().get() + + // increase the partition count for topic + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(partitionHostingBrokers)) + TestUtils.resource(Admin.create(props)) { adminClient => +try { + adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() +} catch { + case _: ExecutionException => +} + } -// increase the partition count for topic -val props = new Properties() -props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers)) -TestUtils.resource(Admin.create(props)) { adminClient => - try { -adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() - } catch { -case _: ExecutionException => + // bring back the failed broker + follower.startup() + TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers) +} else { + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers + val partitionHostingServers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) +
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1440196006 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness { }.toSet } - @Test - def testIncreasePartitionCountDuringDeleteTopic(): Unit = { -val topic = "test" + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = { val topicPartition = new TopicPartition(topic, 0) -val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) -brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) -// create brokers -val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) -this.servers = allServers -val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) -// create the topic -TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) -// wait until replica log is created on every broker -TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), - "Replicas for topic test not created.") -// shutdown a broker to make sure the following topic deletion will be suspended -val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) -assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") -val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last -follower.shutdown() -// start topic deletion -adminZkClient.deleteTopic(topic) -// make sure deletion of all of the topic's replicas have been tried -ensureControllerExists() -val (controller, controllerId) = getController() -val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) -TestUtils.waitUntilTrue(() => { - val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) - val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) - allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) -}, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") +if (isKRaftTest()) { + val topicPartition = new TopicPartition(topic, 0) + val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled = true) + this.brokers = allBrokers + val partitionHostingBrokers = allBrokers.filter(b => expectedReplicaAssignment(0).contains(b.config.brokerId)) + + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined), +"Replicas for topic test not created.") + + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition) + val follower = partitionHostingBrokers.filter(s => s.config.brokerId != leaderIdOpt).last + follower.shutdown() + // start topic deletion + admin.deleteTopics(Collections.singletonList(topic)).all().get() + + // increase the partition count for topic + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(partitionHostingBrokers)) + TestUtils.resource(Admin.create(props)) { adminClient => +try { + adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() +} catch { + case _: ExecutionException => +} + } -// increase the partition count for topic -val props = new Properties() -props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers)) -TestUtils.resource(Admin.create(props)) { adminClient => - try { -adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() - } catch { -case _: ExecutionException => + // bring back the failed broker + follower.startup() + TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers) +} else { + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers + val partitionHostingServers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) +
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1867865063 @dengziming I have triggered the build 3 more times and DeleteTopicTest passed. I have also enabled KRaft for testIncreasePartitionCountDuringDeleteTopic (they didn't have much common code so simply separated by a single `if (!kraftTest())` clause. I also added comments for 2 other tests explaining why KRaft is not enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1860310729 Hello @tinaselenge thanks for the update, here are some of my suggestions: 1. If some controllerContext stuff is required, such as `testIncreasePartitionCountDuringDeleteTopic`, it's testing some concurrent problems, we use a single event queue in KRaft mode so it's unnecessary to validate these states, and we can put these validations in a `if (!kraftTest())` clause. 2. If we need to validate zk path, we can also put them in `if (!kraftTest())` clause. 3. For some operations using `adminZkClient` we can use `AdminClient` to replace them, use `AdminClient.deleteTopic` to replace `adminZkClient.deleteTopic`. and please merge trunk to trigger CI for at lease 3 times, we have been deeply affected by flaky tests in the past and we should avoid it as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1853763567 @dengziming I believe I fixed the flaky tests which did not appear to be failing in any of the recent builds. The builds still failed but due to the non related tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1842367014 Thanks @dengziming. I will investigate them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1841979753 Hello @tinaselenge , there are some flaky related to this, we should take some time to investigate it, and we should trigger the CI more times to ensure less flaky. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//#showFailuresLink [Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteTopicTest.testIncreasePartitionCountDuringDeleteTopic()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testIncreasePartitionCountDuringDeleteTopic__/) [Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteTopicTest.testDisableDeleteTopic(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testDisableDeleteTopic_String__quorum_kraft/) [Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteTopicTest.executionError](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___executionError/) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1829653182 Hi @dengziming , can you please take a look at this PR whenever you have time? There are a few test cases that I couldn't run in KRaft mode. They require replica states from controllerContext of KafkaServer. Some of them also require zookeeper's path to see if topics are marked as deleted but not yet deleted. I left those ones as they are, because they are more zookeeper specific scenarios. I was wondering what you think, or if you could suggest a way to enable KRaft for these test cases. Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge opened a new pull request, #14846: URL: https://github.com/apache/kafka/pull/14846 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org