This is an automated email from the ASF dual-hosted git repository.

dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 095bd0a6d46 KAFKA-18101: Merge duplicate assertFutureThrows and 
assertFutureExceptionTypeEquals (#17991)
095bd0a6d46 is described below

commit 095bd0a6d46ed06f1ec26957fc9d2ab7063eef2c
Author: Peter Lee <[email protected]>
AuthorDate: Wed Dec 4 14:15:03 2024 +0800

    KAFKA-18101: Merge duplicate assertFutureThrows and 
assertFutureExceptionTypeEquals (#17991)
    
    Reviewers: Ziming Deng<[email protected]>,  Chia-Ping 
Tsai<[email protected]>,  TaiJuWu<[email protected]>.
---
 .../AdminClientWithPoliciesIntegrationTest.scala   |  14 +--
 .../kafka/api/AuthorizerIntegrationTest.scala      |  14 +--
 .../kafka/api/BaseAdminIntegrationTest.scala       |  13 +--
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 106 ++++++++++-----------
 .../kafka/api/SaslSslAdminIntegrationTest.scala    |  19 ++--
 .../kafka/server/DynamicConfigChangeTest.scala     |   5 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   9 --
 7 files changed, 87 insertions(+), 93 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 562fa12d036..d6d74c5b941 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -17,7 +17,6 @@ import java.util
 import java.util.{Collections, Properties}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry}
@@ -28,6 +27,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
@@ -155,10 +155,10 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     alterResult = client.incrementalAlterConfigs(alterConfigs)
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
-    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
+    assertFutureThrows(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), 
classOf[InvalidConfigurationException])
-    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
+    assertFutureThrows(alterResult.values.get(topicResource3), 
classOf[InvalidConfigurationException])
+    assertFutureThrows(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
     assertTrue(validationsForResource(brokerResource).isEmpty,
       "Should not see the broker resource in the AlterConfig policy when the 
broker configs are not being updated.")
     validations.clear()
@@ -184,10 +184,10 @@ class AdminClientWithPoliciesIntegrationTest extends 
KafkaServerTestHarness with
     alterResult = client.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, topicResource3, 
brokerResource).asJava, alterResult.values.keySet)
-    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
+    assertFutureThrows(alterResult.values.get(topicResource1), 
classOf[PolicyViolationException])
     alterResult.values.get(topicResource2).get
-    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), 
classOf[InvalidConfigurationException])
-    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
+    assertFutureThrows(alterResult.values.get(topicResource3), 
classOf[InvalidConfigurationException])
+    assertFutureThrows(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
     assertTrue(validationsForResource(brokerResource).isEmpty,
       "Should not see the broker resource in the AlterConfig policy when the 
broker configs are not being updated.")
     validations.clear()
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0b24ba217ac..83d560960b2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1601,7 +1601,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   def testDescribeGroupApiWithNoGroupAcl(quorum: String): Unit = {
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
     val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
-    
TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), 
classOf[GroupAuthorizationException])
+    JTestUtils.assertFutureThrows(result.describedGroups().get(group), 
classOf[GroupAuthorizationException])
   }
 
   @ParameterizedTest
@@ -1687,14 +1687,14 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     consumer.assign(List(tp).asJava)
     consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
     val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
-    
TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), 
classOf[GroupAuthorizationException])
+    JTestUtils.assertFutureThrows(result.deletedGroups().get(group), 
classOf[GroupAuthorizationException])
   }
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testDeleteGroupApiWithNoDeleteGroupAcl2(quorum: String): Unit = {
     val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
-    
TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), 
classOf[GroupAuthorizationException])
+    JTestUtils.assertFutureThrows(result.deletedGroups().get(group), 
classOf[GroupAuthorizationException])
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@@ -1725,7 +1725,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
     consumer.close()
     val result = createAdminClient().deleteConsumerGroupOffsets(group, 
Set(tp).asJava)
-    TestUtils.assertFutureExceptionTypeEquals(result.all(), 
classOf[GroupAuthorizationException])
+    JTestUtils.assertFutureThrows(result.all(), 
classOf[GroupAuthorizationException])
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@@ -1745,15 +1745,15 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, DELETE, ALLOW)), groupResource)
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), groupResource)
     val result = createAdminClient().deleteConsumerGroupOffsets(group, 
Set(tp).asJava)
-    TestUtils.assertFutureExceptionTypeEquals(result.all(), 
classOf[TopicAuthorizationException])
-    TestUtils.assertFutureExceptionTypeEquals(result.partitionResult(tp), 
classOf[TopicAuthorizationException])
+    JTestUtils.assertFutureThrows(result.all(), 
classOf[TopicAuthorizationException])
+    JTestUtils.assertFutureThrows(result.partitionResult(tp), 
classOf[TopicAuthorizationException])
   }
 
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testDeleteGroupOffsetsWithNoAcl(quorum: String): Unit = {
     val result = createAdminClient().deleteConsumerGroupOffsets(group, 
Set(tp).asJava)
-    TestUtils.assertFutureExceptionTypeEquals(result.all(), 
classOf[GroupAuthorizationException])
+    JTestUtils.assertFutureThrows(result.all(), 
classOf[GroupAuthorizationException])
   }
 
   @ParameterizedTest
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index 6a877bd6361..3fc63c59526 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.security.authorizer.AclEntry
+import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@@ -108,14 +109,14 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
     val failedCreateResult = client.createTopics(newTopics.asJava)
     val results = failedCreateResult.values()
     assertTrue(results.containsKey("mytopic"))
-    assertFutureExceptionTypeEquals(results.get("mytopic"), 
classOf[TopicExistsException])
+    assertFutureThrows(results.get("mytopic"), classOf[TopicExistsException])
     assertTrue(results.containsKey("mytopic2"))
-    assertFutureExceptionTypeEquals(results.get("mytopic2"), 
classOf[TopicExistsException])
+    assertFutureThrows(results.get("mytopic2"), classOf[TopicExistsException])
     assertTrue(results.containsKey("mytopic3"))
-    assertFutureExceptionTypeEquals(results.get("mytopic3"), 
classOf[TopicExistsException])
-    
assertFutureExceptionTypeEquals(failedCreateResult.numPartitions("mytopic3"), 
classOf[TopicExistsException])
-    
assertFutureExceptionTypeEquals(failedCreateResult.replicationFactor("mytopic3"),
 classOf[TopicExistsException])
-    assertFutureExceptionTypeEquals(failedCreateResult.config("mytopic3"), 
classOf[TopicExistsException])
+    assertFutureThrows(results.get("mytopic3"), classOf[TopicExistsException])
+    assertFutureThrows(failedCreateResult.numPartitions("mytopic3"), 
classOf[TopicExistsException])
+    assertFutureThrows(failedCreateResult.replicationFactor("mytopic3"), 
classOf[TopicExistsException])
+    assertFutureThrows(failedCreateResult.config("mytopic3"), 
classOf[TopicExistsException])
 
     val topicToDescription = 
client.describeTopics(topics.asJava).allTopicNames.get()
     assertEquals(topics.toSet, topicToDescription.keySet.asScala)
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 2eb773c0614..b4d06d9f993 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs, ZkConfigs}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogFileUtils}
-import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
+import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, 
assertFutureThrows}
 import org.apache.log4j.PropertyConfigurator
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@@ -726,7 +726,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val nonExistingTopic = "non-existing"
     val results = client.describeTopics(Seq(nonExistingTopic, 
existingTopic).asJava).topicNameValues()
     assertEquals(existingTopic, results.get(existingTopic).get.name)
-    assertFutureExceptionTypeEquals(results.get(nonExistingTopic), 
classOf[UnknownTopicOrPartitionException])
+    assertFutureThrows(results.get(nonExistingTopic), 
classOf[UnknownTopicOrPartitionException])
   }
 
   @ParameterizedTest
@@ -745,7 +745,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     val results = 
client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId, 
nonExistingTopicId).asJava)).topicIdValues()
     assertEquals(existingTopicId, results.get(existingTopicId).get.topicId())
-    assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), 
classOf[UnknownTopicIdException])
+    assertFutureThrows(results.get(nonExistingTopicId), 
classOf[UnknownTopicIdException])
   }
 
   @ParameterizedTest
@@ -1104,8 +1104,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       groupResource -> groupAlterConfigs
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
-    assertFutureExceptionTypeEquals(alterResult.values.get(groupResource), 
classOf[InvalidConfigurationException],
-      Some("consumer.session.timeout.ms must be greater than or equal to 
group.consumer.min.session.timeout.ms"))
+    assertFutureThrows(alterResult.values.get(groupResource), 
classOf[InvalidConfigurationException],
+      "consumer.session.timeout.ms must be greater than or equal to 
group.consumer.min.session.timeout.ms")
   }
 
   @ParameterizedTest
@@ -1707,10 +1707,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, 
"mytopic3", PatternType.LITERAL),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, 
AclPermissionType.ALLOW))
     client = createAdminClient
-    
assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(),
 classOf[SecurityDisabledException])
-    
assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(acl)).all(),
+    assertFutureThrows(client.describeAcls(AclBindingFilter.ANY).values(), 
classOf[SecurityDisabledException])
+    assertFutureThrows(client.createAcls(Collections.singleton(acl)).all(),
       classOf[SecurityDisabledException])
-    
assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(acl.toFilter())).all(),
+    
assertFutureThrows(client.deleteAcls(Collections.singleton(acl.toFilter())).all(),
       classOf[SecurityDisabledException])
   }
 
@@ -1727,7 +1727,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val future = client.createTopics(newTopics.asJava, new 
CreateTopicsOptions().validateOnly(true)).all()
     client.close(time.Duration.ofHours(2))
     val future2 = client.createTopics(newTopics.asJava, new 
CreateTopicsOptions().validateOnly(true)).all()
-    assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException])
+    assertFutureThrows(future2, classOf[IllegalStateException])
     future.get
     client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout 
should have no effect
   }
@@ -1747,7 +1747,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val future = client.createTopics(Seq("mytopic", "mytopic2").map(new 
NewTopic(_, 1, 1.toShort)).asJava,
       new CreateTopicsOptions().timeoutMs(900000)).all()
     client.close(time.Duration.ZERO)
-    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+    assertFutureThrows(future, classOf[TimeoutException])
   }
 
   /**
@@ -1764,7 +1764,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val startTimeMs = Time.SYSTEM.milliseconds()
     val future = client.createTopics(Seq("mytopic", "mytopic2").map(new 
NewTopic(_, 1, 1.toShort)).asJava,
       new CreateTopicsOptions().timeoutMs(2)).all()
-    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+    assertFutureThrows(future, classOf[TimeoutException])
     val endTimeMs = Time.SYSTEM.milliseconds()
     assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least 
one millisecond.")
   }
@@ -1782,7 +1782,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     client = KafkaAdminClientTest.createInternal(new 
AdminClientConfig(config), factory)
     val future = client.createTopics(Seq("mytopic", "mytopic2").map(new 
NewTopic(_, 1, 1.toShort)).asJava,
         new CreateTopicsOptions().validateOnly(true)).all()
-    assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
+    assertFutureThrows(future, classOf[TimeoutException])
     val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new 
NewTopic(_, 1, 1.toShort)).asJava,
       new CreateTopicsOptions().validateOnly(true)).all()
     future2.get
@@ -1982,9 +1982,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             Collections.singleton(new MemberToRemove(invalidInstanceId))
           ))
 
-          TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, 
classOf[UnknownMemberIdException])
+          assertFutureThrows(removeMembersResult.all, 
classOf[UnknownMemberIdException])
           val firstMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(invalidInstanceId))
-          TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, 
classOf[UnknownMemberIdException])
+          assertFutureThrows(firstMemberFuture, 
classOf[UnknownMemberIdException])
 
           // Test consumer group deletion
           var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
@@ -1992,12 +1992,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
           // Deleting the fake group ID should get GroupIdNotFoundException.
           assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
-          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+          assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
             classOf[GroupIdNotFoundException])
 
           // Deleting the real group ID should get GroupNotEmptyException
           assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
-          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+          assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
             classOf[GroupNotEmptyException])
 
           // Test delete one correct static member
@@ -2255,9 +2255,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
             Collections.singleton(new MemberToRemove(invalidInstanceId))
           ))
 
-          TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, 
classOf[UnknownMemberIdException])
+          assertFutureThrows(removeMembersResult.all, 
classOf[UnknownMemberIdException])
           val firstMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(invalidInstanceId))
-          TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, 
classOf[UnknownMemberIdException])
+          assertFutureThrows(firstMemberFuture, 
classOf[UnknownMemberIdException])
 
           // Test consumer group deletion
           var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
@@ -2265,12 +2265,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
           // Deleting the fake group ID should get GroupIdNotFoundException.
           assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
-          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+          assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
             classOf[GroupIdNotFoundException])
 
           // Deleting the real group ID should get GroupNotEmptyException
           assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
-          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+          assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
             classOf[GroupNotEmptyException])
 
           // Test delete one correct static member
@@ -2379,29 +2379,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         val offsetDeleteResult = 
client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
 
         // Top level error will equal to the first partition level error
-        assertFutureExceptionTypeEquals(offsetDeleteResult.all(), 
classOf[GroupSubscribedToTopicException])
-        
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp1),
+        assertFutureThrows(offsetDeleteResult.all(), 
classOf[GroupSubscribedToTopicException])
+        assertFutureThrows(offsetDeleteResult.partitionResult(tp1),
           classOf[GroupSubscribedToTopicException])
-        
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
+        assertFutureThrows(offsetDeleteResult.partitionResult(tp2),
           classOf[UnknownTopicOrPartitionException])
 
         // Test the fake group ID
         val fakeDeleteResult = client.deleteConsumerGroupOffsets(fakeGroupId, 
Set(tp1, tp2).asJava)
 
-        assertFutureExceptionTypeEquals(fakeDeleteResult.all(), 
classOf[GroupIdNotFoundException])
-        assertFutureExceptionTypeEquals(fakeDeleteResult.partitionResult(tp1),
+        assertFutureThrows(fakeDeleteResult.all(), 
classOf[GroupIdNotFoundException])
+        assertFutureThrows(fakeDeleteResult.partitionResult(tp1),
           classOf[GroupIdNotFoundException])
-        assertFutureExceptionTypeEquals(fakeDeleteResult.partitionResult(tp2),
+        assertFutureThrows(fakeDeleteResult.partitionResult(tp2),
           classOf[GroupIdNotFoundException])
       }
 
       // Test offset deletion when group is empty
       val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, 
Set(tp1, tp2).asJava)
 
-      assertFutureExceptionTypeEquals(offsetDeleteResult.all(),
+      assertFutureThrows(offsetDeleteResult.all(),
         classOf[UnknownTopicOrPartitionException])
       assertNull(offsetDeleteResult.partitionResult(tp1).get())
-      assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
+      assertFutureThrows(offsetDeleteResult.partitionResult(tp2),
         classOf[UnknownTopicOrPartitionException])
     } finally {
       Utils.closeQuietly(client, "adminClient")
@@ -3213,8 +3213,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       topic1Resource -> topic1AlterConfigs
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
-      Some("Invalid value zip for configuration compression.type"))
+    assertFutureThrows(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
+      "Invalid value zip for configuration compression.type: String must be 
one of: uncompressed, zstd, lz4, snappy, gzip, producer")
   }
 
   @ParameterizedTest
@@ -3362,8 +3362,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 
     // InvalidRequestException error for topic1
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidRequestException],
-      Some("Error due to duplicate config keys"))
+    assertFutureThrows(alterResult.values().get(topic1Resource), 
classOf[InvalidRequestException],
+      "Error due to duplicate config keys")
 
     // Operation should succeed for topic2
     alterResult.values().get(topic2Resource).get()
@@ -3393,11 +3393,11 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource, topic2Resource).asJava, 
alterResult.values.keySet)
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
-      Some("Can't APPEND to key compression.type because its type is not 
LIST."))
+    assertFutureThrows(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
+      "Can't APPEND to key compression.type because its type is not LIST.")
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), 
classOf[InvalidConfigurationException],
-      Some("Can't SUBTRACT to key compression.type because its type is not 
LIST."))
+    assertFutureThrows(alterResult.values().get(topic2Resource), 
classOf[InvalidConfigurationException],
+      "Can't SUBTRACT to key compression.type because its type is not LIST.")
 
     // Try to add invalid config
     topic1AlterConfigs = Seq(
@@ -3409,8 +3409,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     ).asJava)
     assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet)
 
-    assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
-      Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: 
Value must be no more than 1"))
+    assertFutureThrows(alterResult.values().get(topic1Resource), 
classOf[InvalidConfigurationException],
+      "Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value 
must be no more than 1")
   }
 
   @ParameterizedTest
@@ -3437,8 +3437,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       nonExistentTp1 -> validAssignment,
       nonExistentTp2 -> validAssignment
     ).asJava).values()
-    
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1),
 classOf[UnknownTopicOrPartitionException])
-    
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2),
 classOf[UnknownTopicOrPartitionException])
+    assertFutureThrows(nonExistentPartitionsResult.get(nonExistentTp1), 
classOf[UnknownTopicOrPartitionException])
+    assertFutureThrows(nonExistentPartitionsResult.get(nonExistentTp2), 
classOf[UnknownTopicOrPartitionException])
 
     val extraNonExistentReplica = Optional.of(new NewPartitionReassignment((0 
until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava))
     val negativeIdReplica = Optional.of(new NewPartitionReassignment(Seq(-3, 
-2, -1).map(_.asInstanceOf[Integer]).asJava))
@@ -3448,9 +3448,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       tp2 -> negativeIdReplica,
       tp3 -> duplicateReplica
     ).asJava).values()
-    assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), 
classOf[InvalidReplicaAssignmentException])
-    assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), 
classOf[InvalidReplicaAssignmentException])
-    assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), 
classOf[InvalidReplicaAssignmentException])
+    assertFutureThrows(invalidReplicaResult.get(tp1), 
classOf[InvalidReplicaAssignmentException])
+    assertFutureThrows(invalidReplicaResult.get(tp2), 
classOf[InvalidReplicaAssignmentException])
+    assertFutureThrows(invalidReplicaResult.get(tp3), 
classOf[InvalidReplicaAssignmentException])
   }
 
   @ParameterizedTest
@@ -3465,8 +3465,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertTrue(results.containsKey(longTopicName))
     results.get(longTopicName).get()
     assertTrue(results.containsKey(invalidTopicName))
-    assertFutureExceptionTypeEquals(results.get(invalidTopicName), 
classOf[InvalidTopicException])
-    assertFutureExceptionTypeEquals(client.alterReplicaLogDirs(
+    assertFutureThrows(results.get(invalidTopicName), 
classOf[InvalidTopicException])
+    assertFutureThrows(client.alterReplicaLogDirs(
       Map(new TopicPartitionReplica(longTopicName, 0, 0) -> 
brokers(0).config.logDirs(0)).asJava).all(),
       classOf[InvalidTopicException])
     client.close()
@@ -3493,10 +3493,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       TopicConfig.COMPRESSION_TYPE_CONFIG -> "producer"
     ).asJava
     val newTopic = new NewTopic(topic, 2, brokerCount.toShort)
-    assertFutureExceptionTypeEquals(
+    assertFutureThrows(
       
client.createTopics(Collections.singletonList(newTopic.configs(invalidConfigs))).all,
       classOf[InvalidConfigurationException],
-      Some("Null value not supported for topic configs: retention.bytes")
+      "Null value not supported for topic configs: retention.bytes"
     )
 
     val validConfigs = Map[String, String](TopicConfig.COMPRESSION_TYPE_CONFIG 
-> "producer").asJava
@@ -3509,10 +3509,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, 
null), AlterConfigOp.OpType.SET),
       new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, 
"lz4"), AlterConfigOp.OpType.SET)
     )
-    assertFutureExceptionTypeEquals(
+    assertFutureThrows(
       client.incrementalAlterConfigs(Map(topicResource -> 
alterOps.asJavaCollection).asJava).all,
       classOf[InvalidRequestException],
-      Some("Null value not supported for : retention.bytes")
+      "Null value not supported for : retention.bytes"
     )
     validateLogConfig(compressionType = "producer")
   }
@@ -3932,9 +3932,9 @@ object PlaintextAdminIntegrationTest {
     var alterResult = admin.incrementalAlterConfigs(alterConfigs)
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
-    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
+    assertFutureThrows(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
-    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
+    assertFutureThrows(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
 
     // Verify that first and third resources were not updated and second was 
updated
     test.ensureConsistentKRaftMetadata()
@@ -3961,9 +3961,9 @@ object PlaintextAdminIntegrationTest {
     alterResult = admin.incrementalAlterConfigs(alterConfigs, new 
AlterConfigsOptions().validateOnly(true))
 
     assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, 
alterResult.values.keySet)
-    assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
+    assertFutureThrows(alterResult.values.get(topicResource1), 
classOf[InvalidConfigurationException])
     alterResult.values.get(topicResource2).get
-    assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
+    assertFutureThrows(alterResult.values.get(brokerResource), 
classOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     test.ensureConsistentKRaftMetadata()
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 14d8bbdcaac..3b08e1465f1 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -34,6 +34,7 @@ import 
org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConf
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer
 import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
 import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
@@ -216,7 +217,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, 
AclPermissionType.ALLOW))
     val results2 = client.createAcls(List(aclUnknown).asJava)
     assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
-    assertFutureExceptionTypeEquals(results2.all, 
classOf[InvalidRequestException])
+    assertFutureThrows(results2.all, classOf[InvalidRequestException])
     val results3 = client.deleteAcls(List(acl.toFilter, acl2.toFilter, 
acl3.toFilter).asJava).values
     assertEquals(Set(acl.toFilter, acl2.toFilter, acl3.toFilter), 
results3.keySet.asScala)
     assertEquals(0, results3.get(acl.toFilter).get.values.size())
@@ -403,8 +404,8 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, 
AclPermissionType.ALLOW))
     val results = client.createAcls(List(clusterAcl, 
emptyResourceNameAcl).asJava, new CreateAclsOptions())
     assertEquals(Set(clusterAcl, emptyResourceNameAcl), 
results.values.keySet().asScala)
-    assertFutureExceptionTypeEquals(results.values.get(clusterAcl), 
classOf[InvalidRequestException])
-    assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
+    assertFutureThrows(results.values.get(clusterAcl), 
classOf[InvalidRequestException])
+    assertFutureThrows(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
   @ParameterizedTest
@@ -601,9 +602,9 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
       assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, compressionConfig.value)
       assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG, 
compressionConfig.source)
 
-      assertFutureExceptionTypeEquals(result.numPartitions(topic2), 
classOf[TopicAuthorizationException])
-      assertFutureExceptionTypeEquals(result.replicationFactor(topic2), 
classOf[TopicAuthorizationException])
-      assertFutureExceptionTypeEquals(result.config(topic2), 
classOf[TopicAuthorizationException])
+      assertFutureThrows(result.numPartitions(topic2), 
classOf[TopicAuthorizationException])
+      assertFutureThrows(result.replicationFactor(topic2), 
classOf[TopicAuthorizationException])
+      assertFutureThrows(result.config(topic2), 
classOf[TopicAuthorizationException])
     }
     validateMetadataAndConfigs(validateResult)
 
@@ -614,7 +615,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     val topicIds = getTopicIds()
     assertNotEquals(Uuid.ZERO_UUID, createResult.topicId(topic1).get())
     assertEquals(topicIds(topic1), createResult.topicId(topic1).get())
-    assertFutureExceptionTypeEquals(createResult.topicId(topic2), 
classOf[TopicAuthorizationException])
+    assertFutureThrows(createResult.topicId(topic2), 
classOf[TopicAuthorizationException])
     
     val createResponseConfig = 
createResult.config(topic1).get().entries.asScala
 
@@ -637,7 +638,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     val createDelegationTokenOptions = new 
CreateDelegationTokenOptions().maxLifetimeMs(5000)
 
     // Test expiration for non-exists token
-    TestUtils.assertFutureExceptionTypeEquals(
+    assertFutureThrows(
       client.expireDelegationToken("".getBytes()).expiryTimestamp(),
       classOf[DelegationTokenNotFoundException]
     )
@@ -650,7 +651,7 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
     val token2 = 
client.createDelegationToken(createDelegationTokenOptions.maxLifetimeMs(1000)).delegationToken().get()
     // Ensure current time > maxLifeTimeMs of token
     Thread.sleep(1000)
-    TestUtils.assertFutureExceptionTypeEquals(
+    assertFutureThrows(
       client.expireDelegationToken(token2.hmac(), new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(1)).expiryTimestamp(),
       classOf[DelegationTokenExpiredException]
     )
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index eb63f4b521e..eaca2cc2c95 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
 import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
@@ -320,7 +321,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
       val resource = new ConfigResource(ConfigResource.Type.TOPIC, "")
       val op = new AlterConfigOp(new 
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, "200000"), OpType.SET)
       val future = admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all
-      TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])
+      assertFutureThrows(future, classOf[InvalidRequestException])
     } finally {
       admin.close()
     }
@@ -471,7 +472,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
       val resource = new ConfigResource(ConfigResource.Type.GROUP, "")
       val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "200000"), 
OpType.SET)
       val future = admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all
-      TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])
+      assertFutureThrows(future, classOf[InvalidRequestException])
     } finally {
       admin.close()
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 298168a5310..bb25bc9379c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1558,15 +1558,6 @@ object TestUtils extends Logging {
     )
   }
 
-  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ 
<: Throwable],
-                                      expectedErrorMessage: Option[String] = 
None): Unit = {
-    val cause = assertThrows(classOf[ExecutionException], () => 
future.get()).getCause
-    assertTrue(clazz.isInstance(cause), "Expected an exception of type " + 
clazz.getName + "; got type " +
-      cause.getClass.getName)
-    expectedErrorMessage.foreach(message => 
assertTrue(cause.getMessage.contains(message), s"Received error message : 
${cause.getMessage}" +
-      s" does not contain expected error message : $message"))
-  }
-
   def assertBadConfigContainingMessage(props: Properties, 
expectedExceptionContainsText: String): Unit = {
     try {
       KafkaConfig.fromProps(props)


Reply via email to