This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b0fd99106d3 MINOR: Close UnifiedLog created in tests to avoid resource
leak (#14453)
b0fd99106d3 is described below
commit b0fd99106d3f814026f0c6ab7a58c54d65b96a3b
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Fri Sep 29 11:00:01 2023 +0100
MINOR: Close UnifiedLog created in tests to avoid resource leak (#14453)
Reviewers: Divij Vaidya <[email protected]>, Luke Chen <[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 4 ++--
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala | 10 ++++++++--
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala | 6 +++++-
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala | 4 ++--
4 files changed, 17 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index f5a01c15637..10164a3541f 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -104,7 +104,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean,
val remoteStorageSystemEnable: Boolean = false,
- @volatile private var logOffsetsListener: LogOffsetsListener
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
+ @volatile private var logOffsetsListener: LogOffsetsListener
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with AutoCloseable
{
import kafka.log.UnifiedLog._
@@ -643,7 +643,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Close this log.
* The memory mapped buffer for index files of this log will be left open
until the log is deleted.
*/
- def close(): Unit = {
+ override def close(): Unit = {
debug("Closing log")
lock synchronized {
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c2aa991e38e..577e4a6f735 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -58,6 +58,7 @@ class LogLoaderTest {
val producerIdExpirationCheckIntervalMs: Int =
kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+ var logsToClose: Seq[UnifiedLog] = Seq()
val mockTime = new MockTime()
@BeforeEach
@@ -69,6 +70,7 @@ class LogLoaderTest {
@AfterEach
def tearDown(): Unit = {
brokerTopicStats.close()
+ logsToClose.foreach(l => Utils.closeQuietly(l, "UnifiedLog"))
Utils.delete(tmpDir)
}
@@ -257,8 +259,10 @@ class LogLoaderTest {
maxProducerIdExpirationMs: Int =
producerStateManagerConfig.producerIdExpirationMs,
producerIdExpirationCheckIntervalMs: Int =
producerIdExpirationCheckIntervalMs,
lastShutdownClean: Boolean = true): UnifiedLog = {
- LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time,
logStartOffset, recoveryPoint,
+ val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler,
time, logStartOffset, recoveryPoint,
maxTransactionTimeoutMs, new
ProducerStateManagerConfig(maxProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs, lastShutdownClean)
+ logsToClose = logsToClose :+ log
+ log
}
private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog,
LogSegment) = {
@@ -274,7 +278,9 @@ class LogLoaderTest {
private def recoverAndCheck(config: LogConfig, expectedKeys:
Iterable[Long]): UnifiedLog = {
// method is called only in case of recovery from hard reset
- LogTestUtils.recoverAndCheck(logDir, config, expectedKeys,
brokerTopicStats, mockTime, mockTime.scheduler)
+ val recoveredLog = LogTestUtils.recoverAndCheck(logDir, config,
expectedKeys, brokerTopicStats, mockTime, mockTime.scheduler)
+ logsToClose = logsToClose :+ recoveredLog
+ recoveredLog
}
/**
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 90d911e0adf..6cd1a5f3e19 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -59,6 +59,7 @@ class UnifiedLogTest {
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val mockTime = new MockTime()
+ var logsToClose: Seq[UnifiedLog] = Seq()
val producerStateManagerConfig = new
ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
def metricsKeySet =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
@@ -71,6 +72,7 @@ class UnifiedLogTest {
@AfterEach
def tearDown(): Unit = {
brokerTopicStats.close()
+ logsToClose.foreach(l => Utils.closeQuietly(l, "UnifiedLog"))
Utils.delete(tmpDir)
}
@@ -3954,10 +3956,12 @@ class UnifiedLogTest {
remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
logOffsetsListener: LogOffsetsListener =
LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
- LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time,
logStartOffset, recoveryPoint,
+ val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler,
time, logStartOffset, recoveryPoint,
maxTransactionTimeoutMs, producerStateManagerConfig,
producerIdExpirationCheckIntervalMs,
lastShutdownClean, topicId, keepPartitionMetadataFile, new
ConcurrentHashMap[String, Int],
remoteStorageSystemEnable, remoteLogManager, logOffsetsListener)
+ logsToClose = logsToClose :+ log
+ log
}
private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog,
LogSegment) = {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 7cb26277e93..d2c8e663764 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -105,7 +105,7 @@ class DumpLogSegmentsTest {
@AfterEach
def tearDown(): Unit = {
- log.close()
+ Utils.closeQuietly(log, "UnifiedLog")
Utils.delete(tmpDir)
}
@@ -236,7 +236,7 @@ class DumpLogSegmentsTest {
def testDumpMetadataRecords(): Unit = {
val mockTime = new MockTime
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats,
mockTime.scheduler, mockTime)
+ log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats,
mockTime.scheduler, mockTime)
val metadataRecords = Seq(
new ApiMessageAndVersion(