chia7712 commented on code in PR #15808:
URL: https://github.com/apache/kafka/pull/15808#discussion_r1580291046


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -1594,10 +1313,6 @@ object TestUtils extends Logging {
     }
   }
 
-  def createIsrChangeListener(): MockAlterPartitionListener = {

Review Comment:
   `MockAlterPartitionListener` is used by `AbstractPartitionTest` only, so 
could you move it to `AbstractPartitionTest`?



##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -548,4 +549,31 @@ class DeleteTopicTest extends QuorumTestHarness {
     TestUtils.waitUntilTrue(() => 
brokers.exists(_.asInstanceOf[KafkaServer].kafkaController.isActive), "No 
controller is elected")
     TestUtils.verifyTopicDeletion(zkClient, topic, 2, brokers)
   }
+
+  private def increasePartitions[B <: KafkaBroker](admin: Admin,
+                                           topic: String,
+                                           totalPartitionCount: Int,
+                                           brokersToValidate: Seq[B]
+                                          ): Unit = {
+
+    try {
+      val newPartitionSet: Map[String, NewPartitions] = Map.apply(topic -> 
NewPartitions.increaseTo(totalPartitionCount))
+      admin.createPartitions(newPartitionSet.asJava)
+    } catch {
+      case e: ExecutionException =>
+        throw e
+    }
+
+    if (brokersToValidate.nonEmpty) {
+      // wait until we've propagated all partitions metadata to all brokers
+      val allPartitionsMetadata = 
waitForAllPartitionsMetadata(brokersToValidate, topic, totalPartitionCount)
+
+      (0 until totalPartitionCount - 1).map { i =>
+        i -> allPartitionsMetadata.get(new TopicPartition(topic, 
i)).map(_.leader()).getOrElse(
+          throw new IllegalStateException(s"Cannot get the partition leader 
for topic: $topic, partition: $i in server metadata cache"))

Review Comment:
   this method return nothing, so we don't need to collect the metadata. 
However, it needs to check the existence of topic partition



##########
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##########
@@ -613,4 +613,19 @@ class LogSegmentTest {
     )
   }
 
+  private def checkEquals[T](s1: java.util.Iterator[T], s2: 
java.util.Iterator[T]): Unit = {
+    while (s1.hasNext && s2.hasNext)
+      assertEquals(s1.next, s2.next)
+    assertFalse(s1.hasNext, "Iterators have uneven length--first has more")
+    assertFalse(s2.hasNext, "Iterators have uneven length--second has more")
+  }
+
+  private def writeNonsenseToFile(fileName: File, position: Long, size: Int): 
Unit = {
+    val file = new RandomAccessFile(fileName, "rw")
+    file.seek(position)
+    for (_ <- 0 until size)
+      file.writeByte(random.nextInt(255))
+    file.close()

Review Comment:
   could you please add try-finally?



##########
core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala:
##########
@@ -149,6 +150,23 @@ class SaslPlainSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
    */
   @Test
   def testAcls(): Unit = {
-    TestUtils.verifySecureZkAcls(zkClient, 1)
+    verifySecureZkAcls(zkClient, 1)
+  }
+
+  /**
+   * Verifies that all secure paths in ZK are created with the expected ACL.
+   */
+  private def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: 
Int): Unit = {

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -548,4 +549,31 @@ class DeleteTopicTest extends QuorumTestHarness {
     TestUtils.waitUntilTrue(() => 
brokers.exists(_.asInstanceOf[KafkaServer].kafkaController.isActive), "No 
controller is elected")
     TestUtils.verifyTopicDeletion(zkClient, topic, 2, brokers)
   }
+
+  private def increasePartitions[B <: KafkaBroker](admin: Admin,
+                                           topic: String,
+                                           totalPartitionCount: Int,
+                                           brokersToValidate: Seq[B]
+                                          ): Unit = {
+
+    try {
+      val newPartitionSet: Map[String, NewPartitions] = Map.apply(topic -> 
NewPartitions.increaseTo(totalPartitionCount))
+      admin.createPartitions(newPartitionSet.asJava)
+    } catch {

Review Comment:
   We do nothing for this catch, so it should be fine to remove it.



##########
core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala:
##########
@@ -52,6 +56,20 @@ class SaslPlainPlaintextConsumerTest extends 
BaseConsumerTest with SaslSetup {
    */
   @Test
   def testZkAclsDisabled(): Unit = {
-    TestUtils.verifyUnsecureZkAcls(zkClient)
+    verifyUnsecureZkAcls(zkClient)
+  }
+
+  /**
+   * Verifies that secure paths in ZK have no access control. This is
+   * the case when zookeeper.set.acl=false and no ACLs have been configured.
+   */
+  private def verifyUnsecureZkAcls(zkClient: KafkaZkClient): Unit = {
+    secureZkPaths(zkClient).foreach(path => {

Review Comment:
   We can merge this method into test case `testZkAclsDisabled`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to