This is an automated email from the ASF dual-hosted git repository.
dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 095bd0a6d46 KAFKA-18101: Merge duplicate assertFutureThrows and
assertFutureExceptionTypeEquals (#17991)
095bd0a6d46 is described below
commit 095bd0a6d46ed06f1ec26957fc9d2ab7063eef2c
Author: Peter Lee <[email protected]>
AuthorDate: Wed Dec 4 14:15:03 2024 +0800
KAFKA-18101: Merge duplicate assertFutureThrows and
assertFutureExceptionTypeEquals (#17991)
Reviewers: Ziming Deng<[email protected]>, Chia-Ping
Tsai<[email protected]>, TaiJuWu<[email protected]>.
---
.../AdminClientWithPoliciesIntegrationTest.scala | 14 +--
.../kafka/api/AuthorizerIntegrationTest.scala | 14 +--
.../kafka/api/BaseAdminIntegrationTest.scala | 13 +--
.../kafka/api/PlaintextAdminIntegrationTest.scala | 106 ++++++++++-----------
.../kafka/api/SaslSslAdminIntegrationTest.scala | 19 ++--
.../kafka/server/DynamicConfigChangeTest.scala | 5 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 9 --
7 files changed, 87 insertions(+), 93 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 562fa12d036..d6d74c5b941 100644
---
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -17,7 +17,6 @@ import java.util
import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, AlterConfigsOptions, ConfigEntry}
@@ -28,6 +27,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.policy.AlterConfigPolicy
import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
@@ -155,10 +155,10 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
alterResult = client.incrementalAlterConfigs(alterConfigs)
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
+ assertFutureThrows(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
- assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3),
classOf[InvalidConfigurationException])
- assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
+ assertFutureThrows(alterResult.values.get(topicResource3),
classOf[InvalidConfigurationException])
+ assertFutureThrows(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
assertTrue(validationsForResource(brokerResource).isEmpty,
"Should not see the broker resource in the AlterConfig policy when the
broker configs are not being updated.")
validations.clear()
@@ -184,10 +184,10 @@ class AdminClientWithPoliciesIntegrationTest extends
KafkaServerTestHarness with
alterResult = client.incrementalAlterConfigs(alterConfigs, new
AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, topicResource3,
brokerResource).asJava, alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
+ assertFutureThrows(alterResult.values.get(topicResource1),
classOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
- assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3),
classOf[InvalidConfigurationException])
- assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
+ assertFutureThrows(alterResult.values.get(topicResource3),
classOf[InvalidConfigurationException])
+ assertFutureThrows(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
assertTrue(validationsForResource(brokerResource).isEmpty,
"Should not see the broker resource in the AlterConfig policy when the
broker configs are not being updated.")
validations.clear()
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0b24ba217ac..83d560960b2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1601,7 +1601,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
def testDescribeGroupApiWithNoGroupAcl(quorum: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
-
TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group),
classOf[GroupAuthorizationException])
+ JTestUtils.assertFutureThrows(result.describedGroups().get(group),
classOf[GroupAuthorizationException])
}
@ParameterizedTest
@@ -1687,14 +1687,14 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
consumer.assign(List(tp).asJava)
consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
-
TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group),
classOf[GroupAuthorizationException])
+ JTestUtils.assertFutureThrows(result.deletedGroups().get(group),
classOf[GroupAuthorizationException])
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDeleteGroupApiWithNoDeleteGroupAcl2(quorum: String): Unit = {
val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
-
TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group),
classOf[GroupAuthorizationException])
+ JTestUtils.assertFutureThrows(result.deletedGroups().get(group),
classOf[GroupAuthorizationException])
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@@ -1725,7 +1725,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
consumer.close()
val result = createAdminClient().deleteConsumerGroupOffsets(group,
Set(tp).asJava)
- TestUtils.assertFutureExceptionTypeEquals(result.all(),
classOf[GroupAuthorizationException])
+ JTestUtils.assertFutureThrows(result.all(),
classOf[GroupAuthorizationException])
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@@ -1745,15 +1745,15 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, DELETE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), groupResource)
val result = createAdminClient().deleteConsumerGroupOffsets(group,
Set(tp).asJava)
- TestUtils.assertFutureExceptionTypeEquals(result.all(),
classOf[TopicAuthorizationException])
- TestUtils.assertFutureExceptionTypeEquals(result.partitionResult(tp),
classOf[TopicAuthorizationException])
+ JTestUtils.assertFutureThrows(result.all(),
classOf[TopicAuthorizationException])
+ JTestUtils.assertFutureThrows(result.partitionResult(tp),
classOf[TopicAuthorizationException])
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDeleteGroupOffsetsWithNoAcl(quorum: String): Unit = {
val result = createAdminClient().deleteConsumerGroupOffsets(group,
Set(tp).asJava)
- TestUtils.assertFutureExceptionTypeEquals(result.all(),
classOf[GroupAuthorizationException])
+ JTestUtils.assertFutureThrows(result.all(),
classOf[GroupAuthorizationException])
}
@ParameterizedTest
diff --git
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 6a877bd6361..3fc63c59526 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
+import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@@ -108,14 +109,14 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
val failedCreateResult = client.createTopics(newTopics.asJava)
val results = failedCreateResult.values()
assertTrue(results.containsKey("mytopic"))
- assertFutureExceptionTypeEquals(results.get("mytopic"),
classOf[TopicExistsException])
+ assertFutureThrows(results.get("mytopic"), classOf[TopicExistsException])
assertTrue(results.containsKey("mytopic2"))
- assertFutureExceptionTypeEquals(results.get("mytopic2"),
classOf[TopicExistsException])
+ assertFutureThrows(results.get("mytopic2"), classOf[TopicExistsException])
assertTrue(results.containsKey("mytopic3"))
- assertFutureExceptionTypeEquals(results.get("mytopic3"),
classOf[TopicExistsException])
-
assertFutureExceptionTypeEquals(failedCreateResult.numPartitions("mytopic3"),
classOf[TopicExistsException])
-
assertFutureExceptionTypeEquals(failedCreateResult.replicationFactor("mytopic3"),
classOf[TopicExistsException])
- assertFutureExceptionTypeEquals(failedCreateResult.config("mytopic3"),
classOf[TopicExistsException])
+ assertFutureThrows(results.get("mytopic3"), classOf[TopicExistsException])
+ assertFutureThrows(failedCreateResult.numPartitions("mytopic3"),
classOf[TopicExistsException])
+ assertFutureThrows(failedCreateResult.replicationFactor("mytopic3"),
classOf[TopicExistsException])
+ assertFutureThrows(failedCreateResult.config("mytopic3"),
classOf[TopicExistsException])
val topicToDescription =
client.describeTopics(topics.asJava).allTopicNames.get()
assertEquals(topics.toSet, topicToDescription.keySet.asScala)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 2eb773c0614..b4d06d9f993 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs,
ServerLogConfigs, ZkConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogFileUtils}
-import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
+import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS,
assertFutureThrows}
import org.apache.log4j.PropertyConfigurator
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@@ -726,7 +726,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val nonExistingTopic = "non-existing"
val results = client.describeTopics(Seq(nonExistingTopic,
existingTopic).asJava).topicNameValues()
assertEquals(existingTopic, results.get(existingTopic).get.name)
- assertFutureExceptionTypeEquals(results.get(nonExistingTopic),
classOf[UnknownTopicOrPartitionException])
+ assertFutureThrows(results.get(nonExistingTopic),
classOf[UnknownTopicOrPartitionException])
}
@ParameterizedTest
@@ -745,7 +745,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val results =
client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId,
nonExistingTopicId).asJava)).topicIdValues()
assertEquals(existingTopicId, results.get(existingTopicId).get.topicId())
- assertFutureExceptionTypeEquals(results.get(nonExistingTopicId),
classOf[UnknownTopicIdException])
+ assertFutureThrows(results.get(nonExistingTopicId),
classOf[UnknownTopicIdException])
}
@ParameterizedTest
@@ -1104,8 +1104,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
groupResource -> groupAlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
- assertFutureExceptionTypeEquals(alterResult.values.get(groupResource),
classOf[InvalidConfigurationException],
- Some("consumer.session.timeout.ms must be greater than or equal to
group.consumer.min.session.timeout.ms"))
+ assertFutureThrows(alterResult.values.get(groupResource),
classOf[InvalidConfigurationException],
+ "consumer.session.timeout.ms must be greater than or equal to
group.consumer.min.session.timeout.ms")
}
@ParameterizedTest
@@ -1707,10 +1707,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC,
"mytopic3", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE,
AclPermissionType.ALLOW))
client = createAdminClient
-
assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(),
classOf[SecurityDisabledException])
-
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl)).all(),
+ assertFutureThrows(client.describeAcls(AclBindingFilter.ANY).values(),
classOf[SecurityDisabledException])
+ assertFutureThrows(client.createAcls(Collections.singleton(acl)).all(),
classOf[SecurityDisabledException])
-
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(acl.toFilter())).all(),
+
assertFutureThrows(client.deleteAcls(Collections.singleton(acl.toFilter())).all(),
classOf[SecurityDisabledException])
}
@@ -1727,7 +1727,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val future = client.createTopics(newTopics.asJava, new
CreateTopicsOptions().validateOnly(true)).all()
client.close(time.Duration.ofHours(2))
val future2 = client.createTopics(newTopics.asJava, new
CreateTopicsOptions().validateOnly(true)).all()
- assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException])
+ assertFutureThrows(future2, classOf[IllegalStateException])
future.get
client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout
should have no effect
}
@@ -1747,7 +1747,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new
NewTopic(_, 1, 1.toShort)).asJava,
new CreateTopicsOptions().timeoutMs(900000)).all()
client.close(time.Duration.ZERO)
- assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+ assertFutureThrows(future, classOf[TimeoutException])
}
/**
@@ -1764,7 +1764,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val startTimeMs = Time.SYSTEM.milliseconds()
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new
NewTopic(_, 1, 1.toShort)).asJava,
new CreateTopicsOptions().timeoutMs(2)).all()
- assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+ assertFutureThrows(future, classOf[TimeoutException])
val endTimeMs = Time.SYSTEM.milliseconds()
assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least
one millisecond.")
}
@@ -1782,7 +1782,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client = KafkaAdminClientTest.createInternal(new
AdminClientConfig(config), factory)
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new
NewTopic(_, 1, 1.toShort)).asJava,
new CreateTopicsOptions().validateOnly(true)).all()
- assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+ assertFutureThrows(future, classOf[TimeoutException])
val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new
NewTopic(_, 1, 1.toShort)).asJava,
new CreateTopicsOptions().validateOnly(true)).all()
future2.get
@@ -1982,9 +1982,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Collections.singleton(new MemberToRemove(invalidInstanceId))
))
- TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all,
classOf[UnknownMemberIdException])
+ assertFutureThrows(removeMembersResult.all,
classOf[UnknownMemberIdException])
val firstMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(invalidInstanceId))
- TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture,
classOf[UnknownMemberIdException])
+ assertFutureThrows(firstMemberFuture,
classOf[UnknownMemberIdException])
// Test consumer group deletion
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId,
fakeGroupId).asJava)
@@ -1992,12 +1992,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Deleting the fake group ID should get GroupIdNotFoundException.
assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
-
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+ assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
classOf[GroupIdNotFoundException])
// Deleting the real group ID should get GroupNotEmptyException
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
-
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+ assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
// Test delete one correct static member
@@ -2255,9 +2255,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
Collections.singleton(new MemberToRemove(invalidInstanceId))
))
- TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all,
classOf[UnknownMemberIdException])
+ assertFutureThrows(removeMembersResult.all,
classOf[UnknownMemberIdException])
val firstMemberFuture = removeMembersResult.memberResult(new
MemberToRemove(invalidInstanceId))
- TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture,
classOf[UnknownMemberIdException])
+ assertFutureThrows(firstMemberFuture,
classOf[UnknownMemberIdException])
// Test consumer group deletion
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId,
fakeGroupId).asJava)
@@ -2265,12 +2265,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Deleting the fake group ID should get GroupIdNotFoundException.
assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
-
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+ assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
classOf[GroupIdNotFoundException])
// Deleting the real group ID should get GroupNotEmptyException
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
-
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+ assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
// Test delete one correct static member
@@ -2379,29 +2379,29 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val offsetDeleteResult =
client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
// Top level error will equal to the first partition level error
- assertFutureExceptionTypeEquals(offsetDeleteResult.all(),
classOf[GroupSubscribedToTopicException])
-
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp1),
+ assertFutureThrows(offsetDeleteResult.all(),
classOf[GroupSubscribedToTopicException])
+ assertFutureThrows(offsetDeleteResult.partitionResult(tp1),
classOf[GroupSubscribedToTopicException])
-
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
+ assertFutureThrows(offsetDeleteResult.partitionResult(tp2),
classOf[UnknownTopicOrPartitionException])
// Test the fake group ID
val fakeDeleteResult = client.deleteConsumerGroupOffsets(fakeGroupId,
Set(tp1, tp2).asJava)
- assertFutureExceptionTypeEquals(fakeDeleteResult.all(),
classOf[GroupIdNotFoundException])
- assertFutureExceptionTypeEquals(fakeDeleteResult.partitionResult(tp1),
+ assertFutureThrows(fakeDeleteResult.all(),
classOf[GroupIdNotFoundException])
+ assertFutureThrows(fakeDeleteResult.partitionResult(tp1),
classOf[GroupIdNotFoundException])
- assertFutureExceptionTypeEquals(fakeDeleteResult.partitionResult(tp2),
+ assertFutureThrows(fakeDeleteResult.partitionResult(tp2),
classOf[GroupIdNotFoundException])
}
// Test offset deletion when group is empty
val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId,
Set(tp1, tp2).asJava)
- assertFutureExceptionTypeEquals(offsetDeleteResult.all(),
+ assertFutureThrows(offsetDeleteResult.all(),
classOf[UnknownTopicOrPartitionException])
assertNull(offsetDeleteResult.partitionResult(tp1).get())
- assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
+ assertFutureThrows(offsetDeleteResult.partitionResult(tp2),
classOf[UnknownTopicOrPartitionException])
} finally {
Utils.closeQuietly(client, "adminClient")
@@ -3213,8 +3213,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
topic1Resource -> topic1AlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
- Some("Invalid value zip for configuration compression.type"))
+ assertFutureThrows(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
+ "Invalid value zip for configuration compression.type: String must be
one of: uncompressed, zstd, lz4, snappy, gzip, producer")
}
@ParameterizedTest
@@ -3362,8 +3362,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertEquals(Set(topic1Resource, topic2Resource).asJava,
alterResult.values.keySet)
// InvalidRequestException error for topic1
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidRequestException],
- Some("Error due to duplicate config keys"))
+ assertFutureThrows(alterResult.values().get(topic1Resource),
classOf[InvalidRequestException],
+ "Error due to duplicate config keys")
// Operation should succeed for topic2
alterResult.values().get(topic2Resource).get()
@@ -3393,11 +3393,11 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJava)
assertEquals(Set(topic1Resource, topic2Resource).asJava,
alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
- Some("Can't APPEND to key compression.type because its type is not
LIST."))
+ assertFutureThrows(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
+ "Can't APPEND to key compression.type because its type is not LIST.")
- assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource),
classOf[InvalidConfigurationException],
- Some("Can't SUBTRACT to key compression.type because its type is not
LIST."))
+ assertFutureThrows(alterResult.values().get(topic2Resource),
classOf[InvalidConfigurationException],
+ "Can't SUBTRACT to key compression.type because its type is not LIST.")
// Try to add invalid config
topic1AlterConfigs = Seq(
@@ -3409,8 +3409,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
).asJava)
assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
- Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio:
Value must be no more than 1"))
+ assertFutureThrows(alterResult.values().get(topic1Resource),
classOf[InvalidConfigurationException],
+ "Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value
must be no more than 1")
}
@ParameterizedTest
@@ -3437,8 +3437,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
nonExistentTp1 -> validAssignment,
nonExistentTp2 -> validAssignment
).asJava).values()
-
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1),
classOf[UnknownTopicOrPartitionException])
-
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2),
classOf[UnknownTopicOrPartitionException])
+ assertFutureThrows(nonExistentPartitionsResult.get(nonExistentTp1),
classOf[UnknownTopicOrPartitionException])
+ assertFutureThrows(nonExistentPartitionsResult.get(nonExistentTp2),
classOf[UnknownTopicOrPartitionException])
val extraNonExistentReplica = Optional.of(new NewPartitionReassignment((0
until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava))
val negativeIdReplica = Optional.of(new NewPartitionReassignment(Seq(-3,
-2, -1).map(_.asInstanceOf[Integer]).asJava))
@@ -3448,9 +3448,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
tp2 -> negativeIdReplica,
tp3 -> duplicateReplica
).asJava).values()
- assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1),
classOf[InvalidReplicaAssignmentException])
- assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2),
classOf[InvalidReplicaAssignmentException])
- assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3),
classOf[InvalidReplicaAssignmentException])
+ assertFutureThrows(invalidReplicaResult.get(tp1),
classOf[InvalidReplicaAssignmentException])
+ assertFutureThrows(invalidReplicaResult.get(tp2),
classOf[InvalidReplicaAssignmentException])
+ assertFutureThrows(invalidReplicaResult.get(tp3),
classOf[InvalidReplicaAssignmentException])
}
@ParameterizedTest
@@ -3465,8 +3465,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(results.containsKey(longTopicName))
results.get(longTopicName).get()
assertTrue(results.containsKey(invalidTopicName))
- assertFutureExceptionTypeEquals(results.get(invalidTopicName),
classOf[InvalidTopicException])
- assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
+ assertFutureThrows(results.get(invalidTopicName),
classOf[InvalidTopicException])
+ assertFutureThrows(client.alterReplicaLogDirs(
Map(new TopicPartitionReplica(longTopicName, 0, 0) ->
brokers(0).config.logDirs(0)).asJava).all(),
classOf[InvalidTopicException])
client.close()
@@ -3493,10 +3493,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TopicConfig.COMPRESSION_TYPE_CONFIG -> "producer"
).asJava
val newTopic = new NewTopic(topic, 2, brokerCount.toShort)
- assertFutureExceptionTypeEquals(
+ assertFutureThrows(
client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all,
classOf[InvalidConfigurationException],
- Some("Null value not supported for topic configs: retention.bytes")
+ "Null value not supported for topic configs: retention.bytes"
)
val validConfigs = Map[String, String](TopicConfig.COMPRESSION_TYPE_CONFIG
-> "producer").asJava
@@ -3509,10 +3509,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG,
null), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG,
"lz4"), AlterConfigOp.OpType.SET)
)
- assertFutureExceptionTypeEquals(
+ assertFutureThrows(
client.incrementalAlterConfigs(Map(topicResource ->
alterOps.asJavaCollection).asJava).all,
classOf[InvalidRequestException],
- Some("Null value not supported for : retention.bytes")
+ "Null value not supported for : retention.bytes"
)
validateLogConfig(compressionType = "producer")
}
@@ -3932,9 +3932,9 @@ object PlaintextAdminIntegrationTest {
var alterResult = admin.incrementalAlterConfigs(alterConfigs)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
+ assertFutureThrows(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
alterResult.values.get(topicResource2).get
- assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
+ assertFutureThrows(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
// Verify that first and third resources were not updated and second was
updated
test.ensureConsistentKRaftMetadata()
@@ -3961,9 +3961,9 @@ object PlaintextAdminIntegrationTest {
alterResult = admin.incrementalAlterConfigs(alterConfigs, new
AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava,
alterResult.values.keySet)
- assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
+ assertFutureThrows(alterResult.values.get(topicResource1),
classOf[InvalidConfigurationException])
alterResult.values.get(topicResource2).get
- assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
+ assertFutureThrows(alterResult.values.get(brokerResource),
classOf[InvalidRequestException])
// Verify that no resources are updated since validate_only = true
test.ensureConsistentKRaftMetadata()
diff --git
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 14d8bbdcaac..3b08e1465f1 100644
---
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -34,6 +34,7 @@ import
org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConf
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
@@ -216,7 +217,7 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN,
AclPermissionType.ALLOW))
val results2 = client.createAcls(List(aclUnknown).asJava)
assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
- assertFutureExceptionTypeEquals(results2.all,
classOf[InvalidRequestException])
+ assertFutureThrows(results2.all, classOf[InvalidRequestException])
val results3 = client.deleteAcls(List(acl.toFilter, acl2.toFilter,
acl3.toFilter).asJava).values
assertEquals(Set(acl.toFilter, acl2.toFilter, acl3.toFilter),
results3.keySet.asScala)
assertEquals(0, results3.get(acl.toFilter).get.values.size())
@@ -403,8 +404,8 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ,
AclPermissionType.ALLOW))
val results = client.createAcls(List(clusterAcl,
emptyResourceNameAcl).asJava, new CreateAclsOptions())
assertEquals(Set(clusterAcl, emptyResourceNameAcl),
results.values.keySet().asScala)
- assertFutureExceptionTypeEquals(results.values.get(clusterAcl),
classOf[InvalidRequestException])
- assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl),
classOf[InvalidRequestException])
+ assertFutureThrows(results.values.get(clusterAcl),
classOf[InvalidRequestException])
+ assertFutureThrows(results.values.get(emptyResourceNameAcl),
classOf[InvalidRequestException])
}
@ParameterizedTest
@@ -601,9 +602,9 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, compressionConfig.value)
assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG,
compressionConfig.source)
- assertFutureExceptionTypeEquals(result.numPartitions(topic2),
classOf[TopicAuthorizationException])
- assertFutureExceptionTypeEquals(result.replicationFactor(topic2),
classOf[TopicAuthorizationException])
- assertFutureExceptionTypeEquals(result.config(topic2),
classOf[TopicAuthorizationException])
+ assertFutureThrows(result.numPartitions(topic2),
classOf[TopicAuthorizationException])
+ assertFutureThrows(result.replicationFactor(topic2),
classOf[TopicAuthorizationException])
+ assertFutureThrows(result.config(topic2),
classOf[TopicAuthorizationException])
}
validateMetadataAndConfigs(validateResult)
@@ -614,7 +615,7 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val topicIds = getTopicIds()
assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get())
assertEquals(topicIds(topic1), createResult.topicId(topic1).get())
- assertFutureExceptionTypeEquals(createResult.topicId(topic2),
classOf[TopicAuthorizationException])
+ assertFutureThrows(createResult.topicId(topic2),
classOf[TopicAuthorizationException])
val createResponseConfig =
createResult.config(topic1).get().entries.asScala
@@ -637,7 +638,7 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val createDelegationTokenOptions = new
CreateDelegationTokenOptions().maxLifetimeMs(5000)
// Test expiration for non-exists token
- TestUtils.assertFutureExceptionTypeEquals(
+ assertFutureThrows(
client.expireDelegationToken("".getBytes()).expiryTimestamp(),
classOf[DelegationTokenNotFoundException]
)
@@ -650,7 +651,7 @@ class SaslSslAdminIntegrationTest extends
BaseAdminIntegrationTest with SaslSetu
val token2 =
client.createDelegationToken(createDelegationTokenOptions.maxLifetimeMs(1000)).delegationToken().get()
// Ensure current time > maxLifeTimeMs of token
Thread.sleep(1000)
- TestUtils.assertFutureExceptionTypeEquals(
+ assertFutureThrows(
client.expireDelegationToken(token2.hmac(), new
ExpireDelegationTokenOptions().expiryTimePeriodMs(1)).expiryTimestamp(),
classOf[DelegationTokenExpiredException]
)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index eb63f4b521e..eaca2cc2c95 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
@@ -320,7 +321,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
val op = new AlterConfigOp(new
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, "200000"), OpType.SET)
val future = admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all
- TestUtils.assertFutureExceptionTypeEquals(future,
classOf[InvalidRequestException])
+ assertFutureThrows(future, classOf[InvalidRequestException])
} finally {
admin.close()
}
@@ -471,7 +472,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
val resource = new ConfigResource(ConfigResource.Type.GROUP, "")
val op = new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "200000"),
OpType.SET)
val future = admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all
- TestUtils.assertFutureExceptionTypeEquals(future,
classOf[InvalidRequestException])
+ assertFutureThrows(future, classOf[InvalidRequestException])
} finally {
admin.close()
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 298168a5310..bb25bc9379c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1558,15 +1558,6 @@ object TestUtils extends Logging {
)
}
- def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_
<: Throwable],
- expectedErrorMessage: Option[String] =
None): Unit = {
- val cause = assertThrows(classOf[ExecutionException], () =>
future.get()).getCause
- assertTrue(clazz.isInstance(cause), "Expected an exception of type " +
clazz.getName + "; got type " +
- cause.getClass.getName)
- expectedErrorMessage.foreach(message =>
assertTrue(cause.getMessage.contains(message), s"Received error message :
${cause.getMessage}" +
- s" does not contain expected error message : $message"))
- }
-
def assertBadConfigContainingMessage(props: Properties,
expectedExceptionContainsText: String): Unit = {
try {
KafkaConfig.fromProps(props)