Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-30 Thread via GitHub


tinaselenge commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1916691435

   Thank you @jlprat for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-30 Thread via GitHub


jlprat commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1916625862

   Fix is here: https://github.com/apache/kafka/pull/15290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-30 Thread via GitHub


jlprat commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1916610169

   Hi @tinaselenge and @dengziming
   I think this change broke the build. Right now Kafka can't be compiled with 
Scala 2.12, see the error here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14846/15/pipeline/9
   
   > [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-14846/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:490:62:
 the result type of an implicit conversion must be more specific than Object
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-29 Thread via GitHub


dengziming merged PR #14846:
URL: https://github.com/apache/kafka/pull/14846


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-16 Thread via GitHub


tinaselenge commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1893497699

   @dengziming thank you again for reviewing the code. I addressed your 
comments, please let me know if I missed anything or have any further comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-16 Thread via GitHub


tinaselenge commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1453250983


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
 // delete topic
-adminZkClient.deleteTopic("test")
-TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
+TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
   }
 
-  @Test
-  def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = {
 val topicPartition = new TopicPartition("test", 0)
 val topic = topicPartition.topic
-servers = createTestTopicAndCluster(topic)
+brokers = createTestTopicAndCluster(topic)
 // start topic deletion
-adminZkClient.deleteTopic(topic)
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
 // try to delete topic marked as deleted
-assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => 
adminZkClient.deleteTopic(topic))
+// start topic deletion
+TestUtils.waitUntilTrue(() => {
+  try {
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
+false
+  } catch {
+case e: ExecutionException =>
+  classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass)
+  }
+}, s"Topic ${topic} should be marked for deletion or already deleted.")
 
-TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaServer] = {
-val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
enableControlledShutdown = false)
+  private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, 
deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaBroker] = {
+val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, 
zkConnectOrNull, enableControlledShutdown = false)
 brokerConfigs.foreach(_.setProperty("delete.topic.enable", 
deleteTopicEnabled.toString))
 createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaBroker] = {
 val topicPartition = new TopicPartition(topic, 0)
 // create brokers
-val servers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+val brokers = brokerConfigs.map(b => 
createBroker(KafkaConfig.fromProps(b)))
+
+admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))

Review Comment:
   Not sure if this is necessary,  as we only create the admin here when a new 
set of brokers are created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-10 Thread via GitHub


dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness {
   */
 
 val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
-val topic = "test"
-servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull, 
enableControlledShutdown = false)

Review Comment:
   Why not use `createTestTopicAndCluster` directly?



##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -19,78 +19,95 @@ package kafka.admin
 import java.util
 import java.util.concurrent.ExecutionException
 import java.util.{Collections, Optional, Properties}
-
 import scala.collection.Seq
 import kafka.log.UnifiedLog
 import kafka.zk.TopicPartitionZNode
-import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
+import kafka.utils._
+import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import kafka.controller.{OfflineReplica, PartitionAndReplica, 
ReplicaAssignment, ReplicaDeletionSuccessful}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
NewPartitionReassignment, NewPartitions}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{TopicDeletionDisabledException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.metadata.BrokerState
+
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicTest extends QuorumTestHarness {
 
+  var brokers: Seq[KafkaBroker] = Seq()
+
   var servers: Seq[KafkaServer] = Seq()

Review Comment:
   I thinks it's unnecessary to keep both servers and brokers, we can use 
`KafkaBroker` in most case, or cast it to `KafkaServer` if necessary



##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
 // delete topic
-adminZkClient.deleteTopic("test")
-TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
+TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
   }
 
-  @Test
-  def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = {
 val topicPartition = new TopicPartition("test", 0)
 val topic = topicPartition.topic
-servers = createTestTopicAndCluster(topic)
+brokers = createTestTopicAndCluster(topic)
 // start topic deletion
-adminZkClient.deleteTopic(topic)
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
 // try to delete topic marked as deleted
-assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => 
adminZkClient.deleteTopic(topic))
+// start topic deletion
+TestUtils.waitUntilTrue(() => {
+  try {
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
+false
+  } catch {
+case e: ExecutionException =>
+  classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass)
+  }
+}, s"Topic ${topic} should be marked for deletion or already deleted.")
 
-TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaServer] = {
-val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
enableControlledShutdown = false)
+  private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, 
deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaBroker] = {
+val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, 
zkConnectOrNull, enableControlledShutdown = false)
 brokerConfigs.foreach(_.setProperty("delete.topic.enable", 
deleteTopicEnabled.toString))
 createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def 

Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-04 Thread via GitHub


tinaselenge commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1441637951


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
 }.toSet
   }
 
-  @Test
-  def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
-val topic = "test"
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
 val topicPartition = new TopicPartition(topic, 0)
-val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
-brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-// create brokers
-val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
-this.servers = allServers
-val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
-// create the topic
-TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
-// wait until replica log is created on every broker
-TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
-  "Replicas for topic test not created.")
-// shutdown a broker to make sure the following topic deletion will be 
suspended
-val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
-assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-follower.shutdown()
-// start topic deletion
-adminZkClient.deleteTopic(topic)
 
-// make sure deletion of all of the topic's replicas have been tried
-ensureControllerExists()
-val (controller, controllerId) = getController()
-val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
-TestUtils.waitUntilTrue(() => {
-  val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
-  val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-  allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
-}, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+if (isKRaftTest()) {
+  val topicPartition = new TopicPartition(topic, 0)
+  val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled 
= true)
+  this.brokers = allBrokers
+  val partitionHostingBrokers = allBrokers.filter(b => 
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+  // wait until replica log is created on every broker
+  TestUtils.waitUntilTrue(() => 
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+"Replicas for topic test not created.")
+
+  // shutdown a broker to make sure the following topic deletion will be 
suspended
+  val leaderIdOpt = 
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+  val follower = partitionHostingBrokers.filter(s => s.config.brokerId != 
leaderIdOpt).last
+  follower.shutdown()
+  // start topic deletion
+  admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+  // increase the partition count for topic
+  val props = new Properties()
+  props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+  TestUtils.resource(Admin.create(props)) { adminClient =>
+try {
+  adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+} catch {
+  case _: ExecutionException =>
+}
+  }
 
-// increase the partition count for topic
-val props = new Properties()
-props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(servers))
-TestUtils.resource(Admin.create(props)) { adminClient =>
-  try {
-adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
-  } catch {
-case _: ExecutionException =>
+  // bring back the failed broker
+  follower.startup()
+  TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+} else {
+  val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+  brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+  // create brokers
+  val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+  this.servers = allServers
+  val partitionHostingServers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))

Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-03 Thread via GitHub


dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1440196006


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
 }.toSet
   }
 
-  @Test
-  def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
-val topic = "test"
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
 val topicPartition = new TopicPartition(topic, 0)
-val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
-brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-// create brokers
-val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
-this.servers = allServers
-val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
-// create the topic
-TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
-// wait until replica log is created on every broker
-TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
-  "Replicas for topic test not created.")
-// shutdown a broker to make sure the following topic deletion will be 
suspended
-val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
-assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-follower.shutdown()
-// start topic deletion
-adminZkClient.deleteTopic(topic)
 
-// make sure deletion of all of the topic's replicas have been tried
-ensureControllerExists()
-val (controller, controllerId) = getController()
-val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
-TestUtils.waitUntilTrue(() => {
-  val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
-  val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-  allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
-}, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+if (isKRaftTest()) {
+  val topicPartition = new TopicPartition(topic, 0)
+  val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled 
= true)
+  this.brokers = allBrokers
+  val partitionHostingBrokers = allBrokers.filter(b => 
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+  // wait until replica log is created on every broker
+  TestUtils.waitUntilTrue(() => 
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+"Replicas for topic test not created.")
+
+  // shutdown a broker to make sure the following topic deletion will be 
suspended
+  val leaderIdOpt = 
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+  val follower = partitionHostingBrokers.filter(s => s.config.brokerId != 
leaderIdOpt).last
+  follower.shutdown()
+  // start topic deletion
+  admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+  // increase the partition count for topic
+  val props = new Properties()
+  props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+  TestUtils.resource(Admin.create(props)) { adminClient =>
+try {
+  adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+} catch {
+  case _: ExecutionException =>
+}
+  }
 
-// increase the partition count for topic
-val props = new Properties()
-props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(servers))
-TestUtils.resource(Admin.create(props)) { adminClient =>
-  try {
-adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
-  } catch {
-case _: ExecutionException =>
+  // bring back the failed broker
+  follower.startup()
+  TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+} else {
+  val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+  brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+  // create brokers
+  val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+  this.servers = allServers
+  val partitionHostingServers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
+ 

Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-03 Thread via GitHub


dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1440196006


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
 }.toSet
   }
 
-  @Test
-  def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
-val topic = "test"
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
 val topicPartition = new TopicPartition(topic, 0)
-val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
-brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-// create brokers
-val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
-this.servers = allServers
-val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
-// create the topic
-TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
-// wait until replica log is created on every broker
-TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
-  "Replicas for topic test not created.")
-// shutdown a broker to make sure the following topic deletion will be 
suspended
-val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
-assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-follower.shutdown()
-// start topic deletion
-adminZkClient.deleteTopic(topic)
 
-// make sure deletion of all of the topic's replicas have been tried
-ensureControllerExists()
-val (controller, controllerId) = getController()
-val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
-TestUtils.waitUntilTrue(() => {
-  val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
-  val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-  allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
-}, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+if (isKRaftTest()) {
+  val topicPartition = new TopicPartition(topic, 0)
+  val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled 
= true)
+  this.brokers = allBrokers
+  val partitionHostingBrokers = allBrokers.filter(b => 
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+  // wait until replica log is created on every broker
+  TestUtils.waitUntilTrue(() => 
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+"Replicas for topic test not created.")
+
+  // shutdown a broker to make sure the following topic deletion will be 
suspended
+  val leaderIdOpt = 
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+  val follower = partitionHostingBrokers.filter(s => s.config.brokerId != 
leaderIdOpt).last
+  follower.shutdown()
+  // start topic deletion
+  admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+  // increase the partition count for topic
+  val props = new Properties()
+  props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+  TestUtils.resource(Admin.create(props)) { adminClient =>
+try {
+  adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+} catch {
+  case _: ExecutionException =>
+}
+  }
 
-// increase the partition count for topic
-val props = new Properties()
-props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(servers))
-TestUtils.resource(Admin.create(props)) { adminClient =>
-  try {
-adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
-  } catch {
-case _: ExecutionException =>
+  // bring back the failed broker
+  follower.startup()
+  TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+} else {
+  val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+  brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+  // create brokers
+  val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+  this.servers = allServers
+  val partitionHostingServers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
+ 

Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-12-22 Thread via GitHub


tinaselenge commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1867865063

   @dengziming I have triggered the build 3 more times and DeleteTopicTest 
passed. I have also enabled KRaft for 
testIncreasePartitionCountDuringDeleteTopic (they didn't have much common code 
so simply separated by a single `if (!kraftTest())` clause. I also added 
comments for 2 other tests explaining why KRaft is not enabled. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-12-18 Thread via GitHub


dengziming commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1860310729

   Hello @tinaselenge thanks for the update, here are some of my suggestions:
   1. If some controllerContext stuff is required, such as 
`testIncreasePartitionCountDuringDeleteTopic`, it's testing some concurrent 
problems, we use a single event queue in KRaft mode so it's unnecessary to 
validate these states, and we can put these validations in a `if 
(!kraftTest())` clause.
   2. If we need to validate zk path, we can also put them in `if 
(!kraftTest())` clause.
   3. For some operations using `adminZkClient` we can use `AdminClient` to 
replace them, use `AdminClient.deleteTopic` to replace 
`adminZkClient.deleteTopic`.
   
   and please merge trunk to trigger CI for at lease 3 times, we have been 
deeply affected by flaky tests in the past and we should avoid it as possible.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-12-13 Thread via GitHub


tinaselenge commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1853763567

   @dengziming I believe I fixed the flaky tests which did not appear to be 
failing in any of the recent builds. The builds still failed but due to the non 
related tests. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-12-06 Thread via GitHub


tinaselenge commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1842367014

   Thanks @dengziming. I will investigate them. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-12-05 Thread via GitHub


dengziming commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1841979753

   Hello @tinaselenge , there are some flaky related to this, we should take 
some time to investigate it, and we should trigger the CI more times to ensure 
less flaky.
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//#showFailuresLink
   
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteTopicTest.testIncreasePartitionCountDuringDeleteTopic()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testIncreasePartitionCountDuringDeleteTopic__/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteTopicTest.testDisableDeleteTopic(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testDisableDeleteTopic_String__quorum_kraft/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteTopicTest.executionError](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___executionError/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-11-28 Thread via GitHub


tinaselenge commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1829653182

   Hi @dengziming , can you please take a look at this PR whenever you have 
time? There are a few test cases that I couldn't run in KRaft mode. They 
require replica states from controllerContext of KafkaServer. Some of them also 
require zookeeper's path to see if topics are marked as deleted but not yet 
deleted.  I left those ones as they are, because they are more zookeeper 
specific scenarios. I was wondering what you think, or if you could suggest a 
way to enable KRaft for these test cases. Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-11-27 Thread via GitHub


tinaselenge opened a new pull request, #14846:
URL: https://github.com/apache/kafka/pull/14846

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org