This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdf3aab661f MINOR: remove zk from several tests (#17949)
cdf3aab661f is described below

commit cdf3aab661fb975064bb1bffb61be5bbff138b62
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Nov 27 12:52:04 2024 -0800

    MINOR: remove zk from several tests (#17949)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/admin/RemoteTopicCrudTest.scala          | 33 +-------
 .../kafka/server/GssapiAuthenticationTest.scala    |  2 +-
 .../kafka/server/DeleteTopicsRequestTest.scala     | 93 +---------------------
 .../SaslClientsWithInvalidCredentialsTest.java     | 35 +++++---
 4 files changed, 26 insertions(+), 137 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 95c0b015ac5..e24731261ac 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -444,35 +444,6 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
       () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling 
remote storage feature on the topic level is not supported.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk"))
-  def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
-    val admin = createAdminClient()
-    val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or 
`remote.log.copy.disable` under Zookeeper's mode."
-    val topicConfig = new Properties
-    topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true")
-    TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
-      topicConfig = topicConfig)
-
-    val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
-    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
-      util.Arrays.asList(
-        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
-          AlterConfigOp.OpType.SET),
-      ))
-    assertThrowsException(classOf[InvalidConfigurationException],
-      () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
-
-    configs.clear()
-    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
-      util.Arrays.asList(
-        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
-          AlterConfigOp.OpType.SET),
-      ))
-    assertThrowsException(classOf[InvalidConfigurationException],
-      () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
-  }
-
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testTopicDeletion(quorum: String): Unit = {
@@ -501,7 +472,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, controllerServers, numPartitions, brokerCount,
       topicConfig = topicConfig)
 
-    val tsDisabledProps = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).head
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head
     instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
 
     recreateBrokers(startup = true)
@@ -519,7 +490,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, controllerServers, numPartitions, brokerCount,
       topicConfig = topicConfig)
 
-    val tsDisabledProps = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).head
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head
     instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
 
     recreateBrokers(startup = true)
diff --git 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 125411168d4..fd6cac4a532 100644
--- 
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -181,7 +181,7 @@ class GssapiAuthenticationTest extends 
IntegrationTestHarness with SaslSetup {
    * is thrown immediately, and is not affected by 
<code>connection.failed.authentication.delay.ms</code>.
    */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testServerAuthenticationFailure(quorum: String, groupProtocol: String): 
Unit = {
     // Setup client with a non-existent service principal, so that server 
authentication fails on the client
     val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism, 
Some("another-kafka-service"))
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 74ef9d96964..940703c828c 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -20,7 +20,6 @@ package kafka.server
 import java.util
 import kafka.network.SocketServer
 import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.message.DeleteTopicsRequestData
 import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import org.apache.kafka.common.protocol.Errors
@@ -125,96 +124,6 @@ class DeleteTopicsRequestTest extends BaseRequestTest with 
Logging {
     }
   }
 
-  /*
-   * Only run this test against ZK cluster. The KRaft controller doesn't 
perform operations that have timed out.
-   */
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk"))
-  def testErrorDeleteTopicRequests(quorum: String): Unit = {
-    val timeout = 30000
-    val timeoutTopic = "invalid-timeout"
-
-    // Basic
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
-        new DeleteTopicsRequestData()
-          .setTopicNames(util.Arrays.asList("invalid-topic"))
-          .setTimeoutMs(timeout)).build(),
-      Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
-
-    // Partial
-    createTopic("partial-topic-1")
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
-        new DeleteTopicsRequestData()
-          .setTopicNames(util.Arrays.asList("partial-topic-1", 
"partial-invalid-topic"))
-          .setTimeoutMs(timeout)).build(),
-      Map(
-        "partial-topic-1" -> Errors.NONE,
-        "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
-      )
-    )
-
-    // Topic IDs
-    createTopic("topic-id-1")
-    val validId = getTopicIds()("topic-id-1")
-    val invalidId = Uuid.randomUuid
-    validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder(
-      new DeleteTopicsRequestData()
-        .setTopics(util.Arrays.asList(new 
DeleteTopicState().setTopicId(invalidId),
-            new DeleteTopicState().setTopicId(validId)))
-        .setTimeoutMs(timeout)).build(),
-      Map(
-        invalidId -> Errors.UNKNOWN_TOPIC_ID,
-        validId -> Errors.NONE
-      )
-    )
-
-    // Timeout
-    createTopic(timeoutTopic, 5, 2)
-    // Must be a 0ms timeout to avoid transient test failures. Even a timeout 
of 1ms has succeeded in the past.
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
-        new DeleteTopicsRequestData()
-          .setTopicNames(util.Arrays.asList(timeoutTopic))
-          .setTimeoutMs(0)).build(),
-      Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
-    // The topic should still get deleted eventually
-    TestUtils.waitUntilTrue(() => 
!brokers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is 
never deleted")
-    validateTopicIsDeleted(timeoutTopic)
-  }
-
-  private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, 
expectedResponse: Map[String, Errors]): Unit = {
-    val response = sendDeleteTopicsRequest(request)
-    val errors = response.data.responses
-
-    val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2)
-    assertEquals(expectedResponse.size, errorCount, "The response size should 
match")
-
-    expectedResponse.foreach { case (topic, expectedError) =>
-      assertEquals(expectedResponse(topic).code, errors.find(topic).errorCode, 
"The response error should match")
-      // If no error validate the topic was deleted
-      if (expectedError == Errors.NONE) {
-        validateTopicIsDeleted(topic)
-      }
-    }
-  }
-
-  private def validateErrorDeleteTopicRequestsWithIds(request: 
DeleteTopicsRequest, expectedResponse: Map[Uuid, Errors]): Unit = {
-    val response = sendDeleteTopicsRequest(request)
-    val responses = response.data.responses
-    val errors = responses.asScala.map(result => result.topicId() -> 
result.errorCode()).toMap
-    val names = responses.asScala.map(result => result.topicId() -> 
result.name()).toMap
-
-    val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2)
-    assertEquals(expectedResponse.size, errorCount, "The response size should 
match")
-
-    expectedResponse.foreach { case (topic, expectedError) =>
-      assertEquals(expectedResponse(topic).code, errors(topic), "The response 
error should match")
-      // If no error validate the topic was deleted
-      if (expectedError == Errors.NONE) {
-        validateTopicIsDeleted(names(topic))
-      }
-    }
-  }
-
   private def validateTopicIsDeleted(topic: String): Unit = {
     val metadata = connectAndReceive[MetadataResponse](new 
MetadataRequest.Builder(
       List(topic).asJava, true).build).topicMetadata.asScala
@@ -230,7 +139,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with 
Logging {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("kraft"))
   def testDeleteTopicsVersions(quorum: String): Unit = {
     // This test assumes that the current valid versions are 0-6 please adjust 
the test if there are changes.
     assertEquals(0, DeleteTopicsRequestData.LOWEST_SUPPORTED_VERSION)
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
index bb797a8887e..86f41a72972 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
@@ -19,13 +19,14 @@ package org.apache.kafka.tools.consumer.group;
 import kafka.api.AbstractSaslTest;
 import kafka.api.Both$;
 import kafka.security.JaasTestUtils;
-import kafka.zk.ConfigEntityChangeNotificationZNode;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.metadata.storage.Formatter;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -38,8 +39,12 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
 import scala.Some$;
@@ -89,9 +94,13 @@ public class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     @Override
     public void configureSecurityBeforeServersStart(TestInfo testInfo) {
         super.configureSecurityBeforeServersStart(testInfo);
-        
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
-        // Create broker credentials before starting brokers
-        createScramCredentials(zkConnect(), JaasTestUtils.KAFKA_SCRAM_ADMIN, 
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD);
+    }
+
+    @Override
+    public void addFormatterSettings(Formatter formatter) {
+        formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ");
+        formatter.setScramArguments(Arrays.asList("SCRAM-SHA-256=[name=" + 
JaasTestUtils.KAFKA_SCRAM_ADMIN +
+            ",password=" + JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD + "]"));
     }
 
     @Override
@@ -106,13 +115,13 @@ public class SaslClientsWithInvalidCredentialsTest 
extends AbstractSaslTest {
         startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, 
Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
             JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME));
         super.setUp(testInfo);
-        createTopic(
-            TOPIC,
-            NUM_PARTITIONS,
-            BROKER_COUNT,
-            new Properties(),
-            listenerName(),
-            new Properties());
+        try (Admin admin = createPrivilegedAdminClient()) {
+            admin.createTopics(Collections.singletonList(
+                new NewTopic(TOPIC, NUM_PARTITIONS, (short) 
BROKER_COUNT))).all().
+                    get(5, TimeUnit.MINUTES);
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @AfterEach
@@ -124,7 +133,7 @@ public class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
 
     // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() 
in the ParameterizedTest name.
     @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
     public void testConsumerGroupServiceWithAuthenticationFailure(String 
quorum, String groupProtocol) throws Exception {
         ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
prepareConsumerGroupService();
         try (Consumer<byte[], byte[]> consumer = createConsumer()) {
@@ -137,7 +146,7 @@ public class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     }
 
     @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")
+    
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
     public void testConsumerGroupServiceWithAuthenticationSuccess(String 
quorum, String groupProtocol) throws Exception {
         
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2,
 JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
         ConsumerGroupCommand.ConsumerGroupService consumerGroupService = 
prepareConsumerGroupService();

Reply via email to