This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdf3aab661f MINOR: remove zk from several tests (#17949)
cdf3aab661f is described below
commit cdf3aab661fb975064bb1bffb61be5bbff138b62
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Nov 27 12:52:04 2024 -0800
MINOR: remove zk from several tests (#17949)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/admin/RemoteTopicCrudTest.scala | 33 +-------
.../kafka/server/GssapiAuthenticationTest.scala | 2 +-
.../kafka/server/DeleteTopicsRequestTest.scala | 93 +---------------------
.../SaslClientsWithInvalidCredentialsTest.java | 35 +++++---
4 files changed, 26 insertions(+), 137 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 95c0b015ac5..e24731261ac 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -444,35 +444,6 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Disabling
remote storage feature on the topic level is not supported.")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk"))
- def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
- val admin = createAdminClient()
- val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or
`remote.log.copy.disable` under Zookeeper's mode."
- val topicConfig = new Properties
- topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
- TestUtils.createTopicWithAdmin(admin, testTopicName, brokers,
controllerServers, numPartitions, numReplicationFactor,
- topicConfig = topicConfig)
-
- val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
- configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
- util.Arrays.asList(
- new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
- AlterConfigOp.OpType.SET),
- ))
- assertThrowsException(classOf[InvalidConfigurationException],
- () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
-
- configs.clear()
- configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
- util.Arrays.asList(
- new AlterConfigOp(new
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
- AlterConfigOp.OpType.SET),
- ))
- assertThrowsException(classOf[InvalidConfigurationException],
- () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
- }
-
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTopicDeletion(quorum: String): Unit = {
@@ -501,7 +472,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
- val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
recreateBrokers(startup = true)
@@ -519,7 +490,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
- val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
recreateBrokers(startup = true)
diff --git
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 125411168d4..fd6cac4a532 100644
---
a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -181,7 +181,7 @@ class GssapiAuthenticationTest extends
IntegrationTestHarness with SaslSetup {
* is thrown immediately, and is not affected by
<code>connection.failed.authentication.delay.ms</code>.
*/
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testServerAuthenticationFailure(quorum: String, groupProtocol: String):
Unit = {
// Setup client with a non-existent service principal, so that server
authentication fails on the client
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism,
Some("another-kafka-service"))
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 74ef9d96964..940703c828c 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -20,7 +20,6 @@ package kafka.server
import java.util
import kafka.network.SocketServer
import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.protocol.Errors
@@ -125,96 +124,6 @@ class DeleteTopicsRequestTest extends BaseRequestTest with
Logging {
}
}
- /*
- * Only run this test against ZK cluster. The KRaft controller doesn't
perform operations that have timed out.
- */
- @ParameterizedTest
- @ValueSource(strings = Array("zk"))
- def testErrorDeleteTopicRequests(quorum: String): Unit = {
- val timeout = 30000
- val timeoutTopic = "invalid-timeout"
-
- // Basic
- validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData()
- .setTopicNames(util.Arrays.asList("invalid-topic"))
- .setTimeoutMs(timeout)).build(),
- Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
-
- // Partial
- createTopic("partial-topic-1")
- validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData()
- .setTopicNames(util.Arrays.asList("partial-topic-1",
"partial-invalid-topic"))
- .setTimeoutMs(timeout)).build(),
- Map(
- "partial-topic-1" -> Errors.NONE,
- "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
- )
- )
-
- // Topic IDs
- createTopic("topic-id-1")
- val validId = getTopicIds()("topic-id-1")
- val invalidId = Uuid.randomUuid
- validateErrorDeleteTopicRequestsWithIds(new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData()
- .setTopics(util.Arrays.asList(new
DeleteTopicState().setTopicId(invalidId),
- new DeleteTopicState().setTopicId(validId)))
- .setTimeoutMs(timeout)).build(),
- Map(
- invalidId -> Errors.UNKNOWN_TOPIC_ID,
- validId -> Errors.NONE
- )
- )
-
- // Timeout
- createTopic(timeoutTopic, 5, 2)
- // Must be a 0ms timeout to avoid transient test failures. Even a timeout
of 1ms has succeeded in the past.
- validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData()
- .setTopicNames(util.Arrays.asList(timeoutTopic))
- .setTimeoutMs(0)).build(),
- Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
- // The topic should still get deleted eventually
- TestUtils.waitUntilTrue(() =>
!brokers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is
never deleted")
- validateTopicIsDeleted(timeoutTopic)
- }
-
- private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest,
expectedResponse: Map[String, Errors]): Unit = {
- val response = sendDeleteTopicsRequest(request)
- val errors = response.data.responses
-
- val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2)
- assertEquals(expectedResponse.size, errorCount, "The response size should
match")
-
- expectedResponse.foreach { case (topic, expectedError) =>
- assertEquals(expectedResponse(topic).code, errors.find(topic).errorCode,
"The response error should match")
- // If no error validate the topic was deleted
- if (expectedError == Errors.NONE) {
- validateTopicIsDeleted(topic)
- }
- }
- }
-
- private def validateErrorDeleteTopicRequestsWithIds(request:
DeleteTopicsRequest, expectedResponse: Map[Uuid, Errors]): Unit = {
- val response = sendDeleteTopicsRequest(request)
- val responses = response.data.responses
- val errors = responses.asScala.map(result => result.topicId() ->
result.errorCode()).toMap
- val names = responses.asScala.map(result => result.topicId() ->
result.name()).toMap
-
- val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2)
- assertEquals(expectedResponse.size, errorCount, "The response size should
match")
-
- expectedResponse.foreach { case (topic, expectedError) =>
- assertEquals(expectedResponse(topic).code, errors(topic), "The response
error should match")
- // If no error validate the topic was deleted
- if (expectedError == Errors.NONE) {
- validateTopicIsDeleted(names(topic))
- }
- }
- }
-
private def validateTopicIsDeleted(topic: String): Unit = {
val metadata = connectAndReceive[MetadataResponse](new
MetadataRequest.Builder(
List(topic).asJava, true).build).topicMetadata.asScala
@@ -230,7 +139,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with
Logging {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk"))
+ @ValueSource(strings = Array("kraft"))
def testDeleteTopicsVersions(quorum: String): Unit = {
// This test assumes that the current valid versions are 0-6 please adjust
the test if there are changes.
assertEquals(0, DeleteTopicsRequestData.LOWEST_SUPPORTED_VERSION)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
index bb797a8887e..86f41a72972 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java
@@ -19,13 +19,14 @@ package org.apache.kafka.tools.consumer.group;
import kafka.api.AbstractSaslTest;
import kafka.api.Both$;
import kafka.security.JaasTestUtils;
-import kafka.zk.ConfigEntityChangeNotificationZNode;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -38,8 +39,12 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import scala.Option;
import scala.Some$;
@@ -89,9 +94,13 @@ public class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
@Override
public void configureSecurityBeforeServersStart(TestInfo testInfo) {
super.configureSecurityBeforeServersStart(testInfo);
-
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
- // Create broker credentials before starting brokers
- createScramCredentials(zkConnect(), JaasTestUtils.KAFKA_SCRAM_ADMIN,
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD);
+ }
+
+ @Override
+ public void addFormatterSettings(Formatter formatter) {
+ formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ");
+ formatter.setScramArguments(Arrays.asList("SCRAM-SHA-256=[name=" +
JaasTestUtils.KAFKA_SCRAM_ADMIN +
+ ",password=" + JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD + "]"));
}
@Override
@@ -106,13 +115,13 @@ public class SaslClientsWithInvalidCredentialsTest
extends AbstractSaslTest {
startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS,
Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME));
super.setUp(testInfo);
- createTopic(
- TOPIC,
- NUM_PARTITIONS,
- BROKER_COUNT,
- new Properties(),
- listenerName(),
- new Properties());
+ try (Admin admin = createPrivilegedAdminClient()) {
+ admin.createTopics(Collections.singletonList(
+ new NewTopic(TOPIC, NUM_PARTITIONS, (short)
BROKER_COUNT))).all().
+ get(5, TimeUnit.MINUTES);
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
+ throw new RuntimeException(e);
+ }
}
@AfterEach
@@ -124,7 +133,7 @@ public class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName()
in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
public void testConsumerGroupServiceWithAuthenticationFailure(String
quorum, String groupProtocol) throws Exception {
ConsumerGroupCommand.ConsumerGroupService consumerGroupService =
prepareConsumerGroupService();
try (Consumer<byte[], byte[]> consumer = createConsumer()) {
@@ -137,7 +146,7 @@ public class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
-
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
public void testConsumerGroupServiceWithAuthenticationSuccess(String
quorum, String groupProtocol) throws Exception {
createScramCredentialsViaPrivilegedAdminClient(JaasTestUtils.KAFKA_SCRAM_USER_2,
JaasTestUtils.KAFKA_SCRAM_PASSWORD_2);
ConsumerGroupCommand.ConsumerGroupService consumerGroupService =
prepareConsumerGroupService();