This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 486f65e8c60 KAFKA-18100 `Using` block suppresses all errors (#17954)
486f65e8c60 is described below
commit 486f65e8c60d1126b1b62180b6b66352e2783907
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Nov 28 03:25:19 2024 +0800
KAFKA-18100 `Using` block suppresses all errors (#17954)
https://github.com/apache/kafka/pull/15881 changed our tests to utilize
`using` blocks. But these blocks don't throw any errors, so if there is a
failed assertion within the block, the test will still pass.
We should either check the failure using a corresponding `match` block with
Success(_) and Failure(e), use `using.resource`, or use try/finally blocks to
clean up resources.
See https://www.scala-lang.org/api/3.0.2/scala/util/Using$.html
Co-authored-by: frankvicky <[email protected]>
Reviewers: Chia-Ping Tsai <[email protected]>
---
...onTokenEndToEndAuthorizationWithOwnerTest.scala | 6 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 2 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 5 +-
.../scala/integration/kafka/api/SaslSetup.scala | 2 +-
.../kafka/server/RaftClusterSnapshotTest.scala | 4 +-
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 146 +++++++--------------
.../group/CoordinatorLoaderImplTest.scala | 27 ++--
.../kafka/integration/KafkaServerTestHarness.scala | 6 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 4 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 2 +-
11 files changed, 79 insertions(+), 127 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
index d886d52ee9a..833b06654d3 100644
---
a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationWithOwnerTest.scala
@@ -66,7 +66,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest
extends DelegationTokenE
override def configureSecurityAfterServersStart(): Unit = {
// Create the Acls before calling super which will create the additiona
tokens
- Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+ Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
superuserAdminClient.createAcls(List(AclTokenOtherDescribe,
AclTokenCreate, AclTokenDescribe).asJava).values
brokers.foreach { s =>
@@ -106,8 +106,8 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest
extends DelegationTokenE
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testDescribeTokenForOtherUserFails(quorum: String): Unit = {
- Using(createScramAdminClient(kafkaClientSaslMechanism,
describeTokenFailPrincipal.getName, describeTokenFailPassword)) {
describeTokenFailAdminClient =>
- Using(createScramAdminClient(kafkaClientSaslMechanism,
otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
+ Using.resource(createScramAdminClient(kafkaClientSaslMechanism,
describeTokenFailPrincipal.getName, describeTokenFailPassword)) {
describeTokenFailAdminClient =>
+ Using.resource(createScramAdminClient(kafkaClientSaslMechanism,
otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
otherClientAdminClient.createDelegationToken().delegationToken().get()
val tokens = describeTokenFailAdminClient.describeDelegationToken(
new
DescribeDelegationTokenOptions().owners(Collections.singletonList(otherClientPrincipal))
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 0d0173b6121..df3352e0076 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2388,7 +2388,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.MAX_VALUE.toString)
newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString)
- Using(createConsumer(configOverrides = newConsumerConfig)) { consumer =>
+ Using.resource(createConsumer(configOverrides = newConsumerConfig)) {
consumer =>
consumer.subscribe(Collections.singletonList(testTopicName))
val records = consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))
assertNotEquals(0, records.count)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index af3f030648f..20435a130e4 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -26,6 +26,7 @@ import
org.apache.kafka.common.errors.SaslAuthenticationException
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -74,8 +75,10 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms,
Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
+ val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
+ superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
superuserLoginContext)
super.setUp(testInfo)
- Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+ Using.resource(createPrivilegedAdminClient()) { superuserAdminClient =>
TestUtils.createTopicWithAdmin(
superuserAdminClient, topic, brokers, controllerServers, numPartitions
)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index a9eb15a24b2..a38a9189ce6 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -204,7 +204,7 @@ trait SaslSetup {
def createScramCredentials(zkConnect: String, userName: String, password:
String): Unit = {
val zkClientConfig = new ZKClientConfig()
- Using(KafkaZkClient(
+ Using.resource(KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled ||
KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig =
zkClientConfig, enableEntityConfigControllerCheck = false)) { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
diff --git
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index a8919605f2b..ad47da549ff 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -41,7 +41,7 @@ class RaftClusterSnapshotTest {
val numberOfBrokers = 3
val numberOfControllers = 3
- Using(
+ Using.resource(
new KafkaClusterTestKit
.Builder(
new TestKitNodes.Builder()
@@ -74,7 +74,7 @@ class RaftClusterSnapshotTest {
// For every controller and broker perform some sanity checks against
the latest snapshot
for ((_, raftManager) <- cluster.raftManagers().asScala) {
- Using(
+ Using.resource(
RecordsSnapshotReader.of(
raftManager.replicatedLog.latestSnapshot.get(),
new MetadataRecordSerde(),
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index cca56f7aa96..1c65fd5073c 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -124,10 +124,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes())
}
@@ -211,9 +208,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1,
LogStartOffsetIncrementReason.SegmentDeletion)
@@ -246,10 +241,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
assertThrows(
classOf[IllegalArgumentException],
@@ -295,10 +287,7 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
-
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
"Creating an existing snapshot should not do anything")
@@ -342,10 +331,7 @@ final class KafkaMetadataLogTest {
val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
-
- Using(log.createNewSnapshotUnchecked(sameEpochSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, sameEpochSnapshotId)
assertTrue(log.truncateToLatestSnapshot())
assertEquals(sameEpochSnapshotId.offset, log.startOffset)
@@ -356,10 +342,7 @@ final class KafkaMetadataLogTest {
val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch
+ 1)
append(log, numberOfRecords, epoch)
-
- Using(log.createNewSnapshotUnchecked(greaterEpochSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, greaterEpochSnapshotId)
assertTrue(log.truncateToLatestSnapshot())
assertEquals(greaterEpochSnapshotId.offset, log.startOffset)
@@ -376,27 +359,18 @@ final class KafkaMetadataLogTest {
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId1)
append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId2)
append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId3)
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
- append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot
=>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, greaterSnapshotId)
assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId())
assertTrue(log.truncateToLatestSnapshot())
@@ -487,7 +461,7 @@ final class KafkaMetadataLogTest {
metadataDir: File,
snapshotId: OffsetAndEpoch
): Unit = {
- Using(FileRawSnapshotWriter.create(metadataDir.toPath,
snapshotId))(_.freeze())
+ Using.resource(FileRawSnapshotWriter.create(metadataDir.toPath,
snapshotId))(_.freeze())
}
@Test
@@ -499,18 +473,14 @@ final class KafkaMetadataLogTest {
append(log, numberOfRecords, epoch)
val olderEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch - 1)
- Using(log.createNewSnapshotUnchecked(olderEpochSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
-
+ createNewSnapshotUnckecked(log, olderEpochSnapshotId)
assertFalse(log.truncateToLatestSnapshot())
append(log, numberOfRecords, epoch)
+
val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(olderOffsetSnapshotId).get()) {
snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, olderOffsetSnapshotId)
assertFalse(log.truncateToLatestSnapshot())
}
@@ -523,10 +493,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(1, epoch)
append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
-
+ createNewSnapshotUnckecked(log, snapshotId)
log.close()
// Create a few partial snapshots
@@ -560,27 +527,19 @@ final class KafkaMetadataLogTest {
append(log, 1, epoch - 1)
val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId1).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId1)
append(log, 1, epoch)
val oldSnapshotId2 = new OffsetAndEpoch(2, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId2).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId2)
append(log, numberOfRecords - 2, epoch)
val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(oldSnapshotId3).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, oldSnapshotId3)
val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch)
append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(greaterSnapshotId).get()) { snapshot
=>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, greaterSnapshotId)
log.close()
@@ -609,9 +568,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1)
append(log, numberOfRecords, epoch)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
log.close()
@@ -707,9 +664,7 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords,
epoch - 1)
assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
@@ -727,9 +682,8 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
+
// Simulate log cleaning advancing the LSO
log.log.maybeIncrementLogStartOffset(offset,
LogStartOffsetIncrementReason.SegmentDeletion)
@@ -749,9 +703,7 @@ final class KafkaMetadataLogTest {
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, epoch)
- Using(log.createNewSnapshot(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshot(log, snapshotId)
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind)
@@ -766,9 +718,7 @@ final class KafkaMetadataLogTest {
val log = buildMetadataLog(tempDir, mockTime)
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
log.truncateToLatestSnapshot()
@@ -790,9 +740,7 @@ final class KafkaMetadataLogTest {
val log = buildMetadataLog(tempDir, mockTime)
log.updateHighWatermark(new LogOffsetMetadata(offset))
val snapshotId = new OffsetAndEpoch(offset, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
log.truncateToLatestSnapshot()
append(log, numOfRecords, epoch = 3)
@@ -872,16 +820,10 @@ final class KafkaMetadataLogTest {
assertFalse(log.maybeClean(), "Should not clean since no snapshots exist")
val snapshotId1 = new OffsetAndEpoch(1000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
- append(snapshot, 100)
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId1)
val snapshotId2 = new OffsetAndEpoch(2000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
- append(snapshot, 100)
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId2)
val lsoBefore = log.startOffset()
assertTrue(log.maybeClean(), "Expected to clean since there was at least
one snapshot")
@@ -910,10 +852,7 @@ final class KafkaMetadataLogTest {
for (offset <- Seq(100, 200, 300, 400, 500, 600)) {
val snapshotId = new OffsetAndEpoch(offset, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId).get()) { snapshot =>
- append(snapshot, 10)
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId)
}
assertEquals(6, log.snapshotCount())
@@ -945,14 +884,14 @@ final class KafkaMetadataLogTest {
// Then generate two snapshots
val snapshotId1 = new OffsetAndEpoch(1000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
+ Using.resource(log.createNewSnapshotUnchecked(snapshotId1).get()) {
snapshot =>
append(snapshot, 500)
snapshot.freeze()
}
// Then generate a snapshot
val snapshotId2 = new OffsetAndEpoch(2000, 1)
- Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
+ Using.resource(log.createNewSnapshotUnchecked(snapshotId2).get()) {
snapshot =>
append(snapshot, 500)
snapshot.freeze()
}
@@ -992,17 +931,14 @@ final class KafkaMetadataLogTest {
log.log.logSegments.asScala.drop(1).head.baseOffset,
1
)
- Using(log.createNewSnapshotUnchecked(snapshotId1).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId1)
+
// Generate second snapshots that includes the second segment by using the
base offset of the third segment
val snapshotId2 = new OffsetAndEpoch(
log.log.logSegments.asScala.drop(2).head.baseOffset,
1
)
- Using(log.createNewSnapshotUnchecked(snapshotId2).get()) { snapshot =>
- snapshot.freeze()
- }
+ createNewSnapshotUnckecked(log, snapshotId2)
// Sleep long enough to trigger a possible segment delete because of the
default retention
val defaultLogRetentionMs = LogConfig.DEFAULT_RETENTION_MS * 2
@@ -1074,6 +1010,18 @@ object KafkaMetadataLogTest {
log
}
+ def createNewSnapshot(log: KafkaMetadataLog, snapshotId: OffsetAndEpoch):
Unit = {
+ Using.resource(log.createNewSnapshot(snapshotId).get()) { snapshot =>
+ snapshot.freeze()
+ }
+ }
+
+ def createNewSnapshotUnckecked(log: KafkaMetadataLog, snapshotId:
OffsetAndEpoch): Unit = {
+ Using.resource(log.createNewSnapshotUnchecked(snapshotId).get()) {
snapshot =>
+ snapshot.freeze()
+ }
+ }
+
def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int):
LogAppendInfo = {
log.appendAsLeader(
MemoryRecords.withRecords(
@@ -1103,4 +1051,4 @@ object KafkaMetadataLogTest {
}
dir
}
-}
+}
\ No newline at end of file
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index 70397447f5c..68a6ba5da1d 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -59,7 +59,7 @@ class CoordinatorLoaderImplTest {
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -79,7 +79,7 @@ class CoordinatorLoaderImplTest {
val serde = mock(classOf[Deserializer[(String, String)]])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -100,7 +100,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -203,7 +203,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -246,7 +246,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -286,7 +286,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -312,7 +312,8 @@ class CoordinatorLoaderImplTest {
.thenThrow(new RuntimeException("Error!"))
val ex = assertFutureThrows(loader.load(tp, coordinator),
classOf[RuntimeException])
- assertEquals("Error!", ex.getMessage)
+
+ assertEquals(s"Deserializing record DefaultRecord(offset=0,
timestamp=-1, key=2 bytes, value=2 bytes) from $tp failed due to: Error!",
ex.getMessage)
}
}
@@ -327,7 +328,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -359,7 +360,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
val time = new MockTime()
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time,
replicaManager = replicaManager,
deserializer = serde,
@@ -414,7 +415,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -489,7 +490,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -515,7 +516,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
@@ -591,7 +592,7 @@ class CoordinatorLoaderImplTest {
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
- Using(new CoordinatorLoaderImpl[(String, String)](
+ Using.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 0e004b44271..8a8772ea08d 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -160,7 +160,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
adminClientConfig: Properties = new Properties
): Unit = {
if (isKRaftTest()) {
- Using(createAdminClient(brokers, listenerName, adminClientConfig)) {
admin =>
+ Using.resource(createAdminClient(brokers, listenerName,
adminClientConfig)) { admin =>
TestUtils.createOffsetsTopicWithAdmin(admin, brokers,
controllerServers)
}
} else {
@@ -239,7 +239,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
listenerName: ListenerName = listenerName
): Unit = {
if (isKRaftTest()) {
- Using(createAdminClient(brokers, listenerName)) { admin =>
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
@@ -433,7 +433,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
def changeClientIdConfig(sanitizedClientId: String, configs: Properties):
Unit = {
if (isKRaftTest()) {
- Using(createAdminClient(brokers, listenerName)) {
+ Using.resource(createAdminClient(brokers, listenerName)) {
admin => {
admin.alterClientQuotas(Collections.singleton(
new ClientQuotaAlteration(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 64ef5641f23..7ac8966d363 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -111,7 +111,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
//replicate for each of the two follower brokers.
if (!leaderThrottle) throttle = throttle * 3
- Using(createAdminClient(brokers, listenerName)) { admin =>
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
(106 to 107).foreach(registerBroker)
admin.createTopics(List(new NewTopic(topic, assignment.map(a =>
a._1.asInstanceOf[Integer] ->
a._2.map(_.asInstanceOf[Integer]).toList.asJava).asJava)).asJava).all().get()
@@ -212,7 +212,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
val expectedDuration = 4
val throttle: Long = msg.length * msgCount / expectedDuration
- Using(createAdminClient(brokers, listenerName)) { admin =>
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
registerBroker(101)
admin.createTopics(
List(new NewTopic(topic, Collections.singletonMap(0, List(100,
101).map(_.asInstanceOf[Integer]).asJava))).asJava
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index b943dd5bff6..4ac571f452a 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -283,7 +283,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness
with Logging {
}
private def createTopic(topic: String, partitionReplicaAssignment:
collection.Map[Int, Seq[Int]]): Unit = {
- Using(createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
+ Using.resource(createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
try {
TestUtils.createTopicWithAdmin(
admin = admin,
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 984b3a8eb8c..9bbfa7242c3 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -560,7 +560,7 @@ class DumpLogSegmentsTest {
val lastContainedLogTimestamp = 10000
- Using(
+ Using.resource(
new RecordsSnapshotWriter.Builder()
.setTime(new MockTime)
.setLastContainedLogTimestamp(lastContainedLogTimestamp)