[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587106#comment-16587106 ]
ASF GitHub Bot commented on KAFKA-7278: --------------------------------------- lindong28 closed pull request #5535: Cherry-pick KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list URL: https://github.com/apache/kafka/pull/5535 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8b62918bc97..9b423ba5933 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1608,7 +1608,9 @@ class Log(@volatile var dir: File, } /** - * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) + * Perform an asynchronous delete on the given file. + * + * This method assumes that the file exists and the method is not thread-safe. * * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because * it is either called before all logs are loaded or the caller will catch and handle IOException @@ -1655,6 +1657,8 @@ class Log(@volatile var dir: File, */ private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) { lock synchronized { + val existingOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)) + checkIfMemoryMappedBufferClosed() // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() @@ -1663,7 +1667,7 @@ class Log(@volatile var dir: File, addSegment(newSegment) // delete the old files - for (seg <- oldSegments) { + for (seg <- existingOldSegments) { // remove the index entry if (seg.baseOffset != newSegment.baseOffset) segments.remove(seg.baseOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index ae949bf6b85..f6001e9f375 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -21,6 +21,7 @@ import java.io.File import java.nio._ import java.nio.file.Paths import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.common._ import kafka.server.{BrokerTopicStats, LogDirFailureChannel} @@ -88,6 +89,74 @@ class LogCleanerTest extends JUnitSuite { assertEquals(expectedBytesRead, stats.bytesRead) } + @Test + def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = { + val deleteStartLatch = new CountDownLatch(1) + val deleteCompleteLatch = new CountDownLatch(1) + + // Construct a log instance. The replaceSegments() method of the log instance is overridden so that + // it waits for another thread to execute deleteOldSegments() + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete) + val topicPartition = Log.parseTopicPartitionName(dir) + val producerStateManager = new ProducerStateManager(topicPartition, dir) + val log = new Log(dir, + config = LogConfig.fromProps(logConfig.originals, logProps), + logStartOffset = 0L, + recoveryPoint = 0L, + scheduler = time.scheduler, + brokerTopicStats = new BrokerTopicStats, time, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition = topicPartition, + producerStateManager = producerStateManager, + logDirFailureChannel = new LogDirFailureChannel(10)) { + override def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = { + deleteStartLatch.countDown() + if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Log segment deletion timed out") + } + super.replaceSegments(newSegment, oldSegments, isRecoveredSwapFile) + } + } + + // Start a thread which execute log.deleteOldSegments() right before replaceSegments() is executed + val t = new Thread() { + override def run(): Unit = { + deleteStartLatch.await(5000, TimeUnit.MILLISECONDS) + log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset) + log.onHighWatermarkIncremented(log.activeSegment.baseOffset) + log.deleteOldSegments() + deleteCompleteLatch.countDown() + } + } + t.start() + + // Append records so that segment number increase to 3 + while (log.numberOfSegments < 3) { + log.appendAsLeader(record(key = 0, log.logEndOffset.toInt), leaderEpoch = 0) + log.roll() + } + assertEquals(3, log.numberOfSegments) + + // Remember reference to the first log and determine its file name expected for async deletion + val firstLogFile = log.logSegments.head.log + val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, "", Log.DeletedFileSuffix) + + // Clean the log. This should trigger replaceSegments() and deleteOldSegments(); + val offsetMap = new FakeOffsetMap(Int.MaxValue) + val cleaner = makeCleaner(Int.MaxValue) + val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq + val stats = new CleanerStats() + cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats) + cleaner.cleanSegments(log, segments, offsetMap, 0L, stats) + + // Validate based on the file name that log segment file is renamed exactly once for async deletion + assertEquals(expectedFileName, firstLogFile.file().getPath) + assertEquals(2, log.numberOfSegments) + } + @Test def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = { val originalMaxFileSize = 1024; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement > Reporter: Dong Lin > Assignee: Dong Lin > Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)