This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 486f65e8c60 KAFKA-18100 `Using` block suppresses all errors (#17954)
486f65e8c60 is described below

commit 486f65e8c60d1126b1b62180b6b66352e2783907
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Nov 28 03:25:19 2024 +0800

    KAFKA-18100 `Using` block suppresses all errors (#17954)
    
    https://github.com/apache/kafka/pull/15881 changed our tests to utilize 
`using` blocks. But these blocks don't throw any errors, so if there is a 
failed assertion within the block, the test will still pass.
    
    We should either check the failure using a corresponding `match` block with 
Success(_) and Failure(e), use `using.resource`, or use try/finally blocks to 
clean up resources.
    
    See https://www.scala-lang.org/api/3.0.2/scala/util/Using$.html
    
    Co-authored-by: frankvicky <[email protected]>
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 ...onTokenEndToEndAuthorizationWithOwnerTest.scala |   6 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   2 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   5 +-
 .../scala/integration/kafka/api/SaslSetup.scala    |   2 +-
 .../kafka/server/RaftClusterSnapshotTest.scala     |   4 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 146 +++++++--------------
 .../group/CoordinatorLoaderImplTest.scala          |  27 ++--
 .../kafka/integration/KafkaServerTestHarness.scala |   6 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   4 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   2 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   2 +-
 11 files changed, 79 insertions(+), 127 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
 
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
index d886d52ee9a..833b06654d3 100644
--- 
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
@@ -66,7 +66,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest 
extends DelegationTokenE
 
   override def configureSecurityAfterServersStart(): Unit = {
     // Create the Acls before calling super which will create the additiona 
tokens
-    Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+    Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
       superuserAdminClient.createAcls(List(AclTokenOtherDescribe, 
AclTokenCreate, AclTokenDescribe).asJava).values
 
       brokers.foreach { s =>
@@ -106,8 +106,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest 
extends DelegationTokenE
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testDescribeTokenForOtherUserFails(quorum: String): Unit = {
-    Using(createScramAdminClient(kafkaClientSaslMechanism, 
describeTokenFailPrincipal.getName, describeTokenFailPassword)) { 
describeTokenFailAdminClient =>
-      Using(createScramAdminClient(kafkaClientSaslMechanism, 
otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
+    Using.resource(createScramAdminClient(kafkaClientSaslMechanism, 
describeTokenFailPrincipal.getName, describeTokenFailPassword)) { 
describeTokenFailAdminClient =>
+      Using.resource(createScramAdminClient(kafkaClientSaslMechanism, 
otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
         otherClientAdminClient.createDelegationToken().delegationToken().get()
         val tokens = describeTokenFailAdminClient.describeDelegationToken(
           new 
DescribeDelegationTokenOptions().owners(Collections.singletonList(otherClientPrincipal))
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 0d0173b6121..df3352e0076 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2388,7 +2388,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       
newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
Integer.MAX_VALUE.toString)
       newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
 
-      Using(createConsumer(configOverrides = newConsumerConfig)) { consumer =>
+      Using.resource(createConsumer(configOverrides = newConsumerConfig)) { 
consumer =>
         consumer.subscribe(Collections.singletonList(testTopicName))
         val records = consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
         assertNotEquals(0, records.count)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index af3f030648f..20435a130e4 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -26,6 +26,7 @@ import 
org.apache.kafka.common.errors.SaslAuthenticationException
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
 import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -74,8 +75,10 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   override def setUp(testInfo: TestInfo): Unit = {
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
+    val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
+    superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, 
superuserLoginContext)
     super.setUp(testInfo)
-    Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+    Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
       TestUtils.createTopicWithAdmin(
         superuserAdminClient, topic, brokers, controllerServers, numPartitions
       )
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index a9eb15a24b2..a38a9189ce6 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -204,7 +204,7 @@ trait SaslSetup {
 
   def createScramCredentials(zkConnect: String, userName: String, password: 
String): Unit = {
     val zkClientConfig = new ZKClientConfig()
-    Using(KafkaZkClient(
+    Using.resource(KafkaZkClient(
       zkConnect, JaasUtils.isZkSaslEnabled || 
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
       Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = 
zkClientConfig, enableEntityConfigControllerCheck = false)) { zkClient =>
       val adminZkClient = new AdminZkClient(zkClient)
diff --git 
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index a8919605f2b..ad47da549ff 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -41,7 +41,7 @@ class RaftClusterSnapshotTest {
     val numberOfBrokers = 3
     val numberOfControllers = 3
 
-    Using(
+    Using.resource(
       new KafkaClusterTestKit
         .Builder(
           new TestKitNodes.Builder()
@@ -74,7 +74,7 @@ class RaftClusterSnapshotTest {
 
       // For every controller and broker perform some sanity checks against 
the latest snapshot
       for ((_, raftManager) <- cluster.raftManagers().asScala) {
-        Using(
+        Using.resource(
           RecordsSnapshotReader.of(
             raftManager.replicatedLog.latestSnapshot.get(),
             new MetadataRecordSerde(),
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala 
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index cca56f7aa96..1c65fd5073c 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -124,10 +124,7 @@ final class KafkaMetadataLogTest {
 
     append(log, numberOfRecords, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
 
     assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes())
   }
@@ -211,9 +208,7 @@ final class KafkaMetadataLogTest {
 
     append(log, numberOfRecords, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
 
     // Simulate log cleanup that advances the LSO
     log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, 
LogStartOffsetIncrementReason.SegmentDeletion)
@@ -246,10 +241,7 @@ final class KafkaMetadataLogTest {
 
     append(log, numberOfRecords, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
 
     assertThrows(
       classOf[IllegalArgumentException],
@@ -295,10 +287,7 @@ final class KafkaMetadataLogTest {
 
     append(log, numberOfRecords, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
 
     assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
       "Creating an existing snapshot should not do anything")
@@ -342,10 +331,7 @@ final class KafkaMetadataLogTest {
     val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch)
 
     append(log, numberOfRecords, epoch)
-
-    Using(log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) { 
snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, sameEpochSnapshotId)
 
     assertTrue(log.truncateToLatestSnapshot())
     assertEquals(sameEpochSnapshotId.offset, log.startOffset)
@@ -356,10 +342,7 @@ final class KafkaMetadataLogTest {
     val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch 
+ 1)
 
     append(log, numberOfRecords, epoch)
-
-    Using(log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) { 
snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, greaterEpochSnapshotId)
 
     assertTrue(log.truncateToLatestSnapshot())
     assertEquals(greaterEpochSnapshotId.offset, log.startOffset)
@@ -376,27 +359,18 @@ final class KafkaMetadataLogTest {
 
     append(log, 1, epoch - 1)
     val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
-    Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, oldSnapshotId1)
 
     append(log, 1, epoch)
     val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
-    Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, oldSnapshotId2)
 
     append(log, numberOfRecords - 2, epoch)
     val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, oldSnapshotId3)
 
     val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
-    append(log, numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot 
=>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, greaterSnapshotId)
 
     assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId())
     assertTrue(log.truncateToLatestSnapshot())
@@ -487,7 +461,7 @@ final class KafkaMetadataLogTest {
     metadataDir: File,
     snapshotId: OffsetAndEpoch
   ): Unit = {
-    Using(FileRawSnapshotWriter.create(metadataDir.toPath, 
snapshotId))(_.freeze())
+    Using.resource(FileRawSnapshotWriter.create(metadataDir.toPath, 
snapshotId))(_.freeze())
   }
 
   @Test
@@ -499,18 +473,14 @@ final class KafkaMetadataLogTest {
     append(log, numberOfRecords, epoch)
 
     val olderEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch - 1)
-    Using(log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) { 
snapshot =>
-      snapshot.freeze()
-    }
-
+    createNewSnapshotUnckecked(log, olderEpochSnapshotId)
     assertFalse(log.truncateToLatestSnapshot())
 
     append(log, numberOfRecords, epoch)
 
+
     val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) { 
snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, olderOffsetSnapshotId)
 
     assertFalse(log.truncateToLatestSnapshot())
   }
@@ -523,10 +493,7 @@ final class KafkaMetadataLogTest {
     val snapshotId = new OffsetAndEpoch(1, epoch)
 
     append(log, numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
-
+    createNewSnapshotUnckecked(log, snapshotId)
     log.close()
 
     // Create a few partial snapshots
@@ -560,27 +527,19 @@ final class KafkaMetadataLogTest {
 
     append(log, 1, epoch - 1)
     val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
-    Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, oldSnapshotId1)
 
     append(log, 1, epoch)
     val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
-    Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, oldSnapshotId2)
 
     append(log, numberOfRecords - 2, epoch)
     val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, oldSnapshotId3)
 
     val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
     append(log, numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot 
=>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, greaterSnapshotId)
 
     log.close()
 
@@ -609,9 +568,7 @@ final class KafkaMetadataLogTest {
     val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1)
 
     append(log, numberOfRecords, epoch)
-    Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId)
 
     log.close()
 
@@ -707,9 +664,7 @@ final class KafkaMetadataLogTest {
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
 
     val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
     assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -727,9 +682,8 @@ final class KafkaMetadataLogTest {
     log.updateHighWatermark(new LogOffsetMetadata(offset))
 
     val snapshotId = new OffsetAndEpoch(offset, epoch)
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
+
     // Simulate log cleaning advancing the LSO
     log.log.maybeIncrementLogStartOffset(offset, 
LogStartOffsetIncrementReason.SegmentDeletion)
 
@@ -749,9 +703,7 @@ final class KafkaMetadataLogTest {
     log.updateHighWatermark(new LogOffsetMetadata(offset))
 
     val snapshotId = new OffsetAndEpoch(offset, epoch)
-    Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshot(log, snapshotId)
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
     assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
@@ -766,9 +718,7 @@ final class KafkaMetadataLogTest {
     val log = buildMetadataLog(tempDir, mockTime)
     log.updateHighWatermark(new LogOffsetMetadata(offset))
     val snapshotId = new OffsetAndEpoch(offset, 1)
-    Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId)
     log.truncateToLatestSnapshot()
 
 
@@ -790,9 +740,7 @@ final class KafkaMetadataLogTest {
     val log = buildMetadataLog(tempDir, mockTime)
     log.updateHighWatermark(new LogOffsetMetadata(offset))
     val snapshotId = new OffsetAndEpoch(offset, 1)
-    Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId)
     log.truncateToLatestSnapshot()
 
     append(log, numOfRecords, epoch = 3)
@@ -872,16 +820,10 @@ final class KafkaMetadataLogTest {
     assertFalse(log.maybeClean(), "Should not clean since no snapshots exist")
 
     val snapshotId1 = new OffsetAndEpoch(1000, 1)
-    Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
-      append(snapshot, 100)
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId1)
 
     val snapshotId2 = new OffsetAndEpoch(2000, 1)
-    Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
-      append(snapshot, 100)
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId2)
 
     val lsoBefore = log.startOffset()
     assertTrue(log.maybeClean(), "Expected to clean since there was at least 
one snapshot")
@@ -910,10 +852,7 @@ final class KafkaMetadataLogTest {
 
     for (offset <- Seq(100, 200, 300, 400, 500, 600)) {
       val snapshotId = new OffsetAndEpoch(offset, 1)
-      Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
-        append(snapshot, 10)
-        snapshot.freeze()
-      }
+      createNewSnapshotUnckecked(log, snapshotId)
     }
 
     assertEquals(6, log.snapshotCount())
@@ -945,14 +884,14 @@ final class KafkaMetadataLogTest {
 
     // Then generate two snapshots
     val snapshotId1 = new OffsetAndEpoch(1000, 1)
-    Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
+    Using.resource(log.createNewSnapshotUnchecked(snapshotId1).get()) { 
snapshot =>
       append(snapshot, 500)
       snapshot.freeze()
     }
 
     // Then generate a snapshot
     val snapshotId2 = new OffsetAndEpoch(2000, 1)
-    Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
+    Using.resource(log.createNewSnapshotUnchecked(snapshotId2).get()) { 
snapshot =>
       append(snapshot, 500)
       snapshot.freeze()
     }
@@ -992,17 +931,14 @@ final class KafkaMetadataLogTest {
       log.log.logSegments.asScala.drop(1).head.baseOffset,
       1
     )
-    Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId1)
+
     // Generate second snapshots that includes the second segment by using the 
base offset of the third segment
     val snapshotId2 = new OffsetAndEpoch(
       log.log.logSegments.asScala.drop(2).head.baseOffset,
       1
     )
-    Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
-      snapshot.freeze()
-    }
+    createNewSnapshotUnckecked(log, snapshotId2)
 
     // Sleep long enough to trigger a possible segment delete because of the 
default retention
     val defaultLogRetentionMs = LogConfig.DEFAULT_RETENTION_MS * 2
@@ -1074,6 +1010,18 @@ object KafkaMetadataLogTest {
     log
   }
 
+  def createNewSnapshot(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch): 
Unit = {
+    Using.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
+      snapshot.freeze()
+    }
+  }
+
+  def createNewSnapshotUnckecked(log: KafkaMetadataLog, snapshotId: 
OffsetAndEpoch): Unit = {
+    Using.resource(log.createNewSnapshotUnchecked(snapshotId).get()) { 
snapshot =>
+      snapshot.freeze()
+    }
+  }
+
   def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): 
LogAppendInfo = {
     log.appendAsLeader(
       MemoryRecords.withRecords(
@@ -1103,4 +1051,4 @@ object KafkaMetadataLogTest {
     }
     dir
   }
-}
+}
\ No newline at end of file
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index 70397447f5c..68a6ba5da1d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -59,7 +59,7 @@ class CoordinatorLoaderImplTest {
     val serde = mock(classOf[Deserializer[(String, String)]])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -79,7 +79,7 @@ class CoordinatorLoaderImplTest {
     val serde = mock(classOf[Deserializer[(String, String)]])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -100,7 +100,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -203,7 +203,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -246,7 +246,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -286,7 +286,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -312,7 +312,8 @@ class CoordinatorLoaderImplTest {
         .thenThrow(new RuntimeException("Error!"))
 
       val ex = assertFutureThrows(loader.load(tp, coordinator), 
classOf[RuntimeException])
-      assertEquals("Error!", ex.getMessage)
+
+      assertEquals(s"Deserializing record DefaultRecord(offset=0, 
timestamp=-1, key=2 bytes, value=2 bytes) from $tp failed due to: Error!", 
ex.getMessage)
     }
   }
 
@@ -327,7 +328,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -359,7 +360,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
     val time = new MockTime()
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -414,7 +415,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -489,7 +490,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -515,7 +516,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
@@ -591,7 +592,7 @@ class CoordinatorLoaderImplTest {
     val log = mock(classOf[UnifiedLog])
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
-    Using(new CoordinatorLoaderImpl[(String, String)](
+    Using.resource(new CoordinatorLoaderImpl[(String, String)](
       time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 0e004b44271..8a8772ea08d 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -160,7 +160,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     adminClientConfig: Properties = new Properties
   ): Unit = {
     if (isKRaftTest()) {
-      Using(createAdminClient(brokers, listenerName, adminClientConfig)) { 
admin =>
+      Using.resource(createAdminClient(brokers, listenerName, 
adminClientConfig)) { admin =>
         TestUtils.createOffsetsTopicWithAdmin(admin, brokers, 
controllerServers)
       }
     } else {
@@ -239,7 +239,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     listenerName: ListenerName = listenerName
   ): Unit = {
     if (isKRaftTest()) {
-      Using(createAdminClient(brokers, listenerName)) { admin =>
+      Using.resource(createAdminClient(brokers, listenerName)) { admin =>
         TestUtils.deleteTopicWithAdmin(
           admin = admin,
           topic = topic,
@@ -433,7 +433,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
 
   def changeClientIdConfig(sanitizedClientId: String, configs: Properties): 
Unit = {
     if (isKRaftTest()) {
-      Using(createAdminClient(brokers, listenerName)) {
+      Using.resource(createAdminClient(brokers, listenerName)) {
         admin => {
           admin.alterClientQuotas(Collections.singleton(
             new ClientQuotaAlteration(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 64ef5641f23..7ac8966d363 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -111,7 +111,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
     //replicate for each of the two follower brokers.
     if (!leaderThrottle) throttle = throttle * 3
 
-    Using(createAdminClient(brokers, listenerName)) { admin =>
+    Using.resource(createAdminClient(brokers, listenerName)) { admin =>
       (106 to 107).foreach(registerBroker)
       admin.createTopics(List(new NewTopic(topic, assignment.map(a => 
a._1.asInstanceOf[Integer] ->
         
a._2.map(_.asInstanceOf[Integer]).toList.asJava).asJava)).asJava).all().get()
@@ -212,7 +212,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
     val expectedDuration = 4
     val throttle: Long = msg.length * msgCount / expectedDuration
 
-    Using(createAdminClient(brokers, listenerName)) { admin =>
+    Using.resource(createAdminClient(brokers, listenerName)) { admin =>
       registerBroker(101)
       admin.createTopics(
         List(new NewTopic(topic, Collections.singletonMap(0, List(100, 
101).map(_.asInstanceOf[Integer]).asJava))).asJava
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index b943dd5bff6..4ac571f452a 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -283,7 +283,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
   }
 
   private def createTopic(topic: String, partitionReplicaAssignment: 
collection.Map[Int, Seq[Int]]): Unit = {
-    Using(createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
+    Using.resource(createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
       try {
         TestUtils.createTopicWithAdmin(
           admin = admin,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 984b3a8eb8c..9bbfa7242c3 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -560,7 +560,7 @@ class DumpLogSegmentsTest {
 
     val lastContainedLogTimestamp = 10000
 
-    Using(
+    Using.resource(
       new RecordsSnapshotWriter.Builder()
         .setTime(new MockTime)
         .setLastContainedLogTimestamp(lastContainedLogTimestamp)

Reply via email to