This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push: new a9e60a0f398 KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table (#15748) a9e60a0f398 is described below commit a9e60a0f398e86f2e9c43f1ae570af3f8bc15ee0 Author: Kamal Chandraprakash <kchandraprak...@uber.com> AuthorDate: Wed Apr 24 08:39:50 2024 +0530 KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table (#15748) cherry-picked from: d092787 Co-authored-by: hzh0425 642256...@qq.com Reviewers: Luke Chen <show...@gmail.com> --- core/src/main/scala/kafka/log/UnifiedLog.scala | 10 ++++-- .../LogCleanerParameterizedIntegrationTest.scala | 3 +- .../test/scala/unit/kafka/log/LogTestUtils.scala | 4 +++ .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 40 +++++++++++++++++++--- 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index e0734da738a..054cc668cd9 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1465,10 +1465,14 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } localLog.checkIfMemoryMappedBufferClosed() - // remove the segments for lookups - localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) + if (segmentsToDelete.nonEmpty) { + // increment the local-log-start-offset or log-start-offset before removing the segment for lookups + val newLocalLogStartOffset = localLog.segments.higherSegment(segmentsToDelete.last.baseOffset).get.baseOffset + incrementStartOffset(newLocalLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) + // remove the segments for lookups + localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) + } deleteProducerSnapshots(deletable, asyncDelete = true) - incrementStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion) } numToDelete } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 132a77ff97b..efd3430e126 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -121,9 +121,8 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati // Set the last modified time to an old value to force deletion of old segments val endOffset = log.logEndOffset log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs)) - TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, + TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset && log.numberOfSegments == 1, "Timed out waiting for deletion of old segments") - assertEquals(1, log.numberOfSegments) cleaner.shutdown() diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index d9ab92e8a4d..ab59144e25c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -56,7 +56,9 @@ object LogTestUtils { def createLogConfig(segmentMs: Long = LogConfig.DEFAULT_SEGMENT_MS, segmentBytes: Int = LogConfig.DEFAULT_SEGMENT_BYTES, retentionMs: Long = LogConfig.DEFAULT_RETENTION_MS, + localRetentionMs: Long = LogConfig.DEFAULT_LOCAL_RETENTION_MS, retentionBytes: Long = LogConfig.DEFAULT_RETENTION_BYTES, + localRetentionBytes: Long = LogConfig.DEFAULT_LOCAL_RETENTION_BYTES, segmentJitterMs: Long = LogConfig.DEFAULT_SEGMENT_JITTER_MS, cleanupPolicy: String = LogConfig.DEFAULT_CLEANUP_POLICY, maxMessageBytes: Int = LogConfig.DEFAULT_MAX_MESSAGE_BYTES, @@ -68,7 +70,9 @@ object LogTestUtils { logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long) logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long) + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: java.lang.Long) logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: java.lang.Long) + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes: java.lang.Long) logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, segmentJitterMs: java.lang.Long) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy) logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes: Integer) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 90d911e0adf..e231088d7a3 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -39,6 +39,8 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.anyLong import org.mockito.Mockito.{mock, when} @@ -947,8 +949,9 @@ class UnifiedLogTest { assertEquals(0, lastSeq) } - @Test - def testRetentionDeletesProducerStateSnapshots(): Unit = { + @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with createEmptyActiveSegment: {0}") + @ValueSource(booleans = Array(true, false)) + def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment: Boolean): Unit = { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0) val log = createLog(logDir, logConfig) val pid1 = 1L @@ -962,10 +965,14 @@ class UnifiedLogTest { log.roll() log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, producerEpoch = epoch, sequence = 2), leaderEpoch = 0) + if (createEmptyActiveSegment) { + log.roll() + } log.updateHighWatermark(log.logEndOffset) - assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2 + assertEquals(numProducerSnapshots, ProducerStateManager.listSnapshotFiles(logDir).size) // Sleep to breach the retention period mockTime.sleep(1000 * 60 + 1) log.deleteOldSegments() @@ -973,6 +980,7 @@ class UnifiedLogTest { mockTime.sleep(1) assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size, "expect a single producer state snapshot remaining") + assertEquals(3, log.logStartOffset) } @Test @@ -3621,7 +3629,7 @@ class UnifiedLogTest { val records = TestUtils.singletonRecords(value = s"test$i".getBytes) log.appendAsLeader(records, leaderEpoch = 0) } - + log.updateHighWatermark(90L) log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion) assertEquals(20, log.logStartOffset) @@ -3896,6 +3904,30 @@ class UnifiedLogTest { log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard) } + @Test + def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = { + val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + + var offset = 0L + for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch = 0) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) + log.roll() + } + assertEquals(5, log.logSegments.size) + log.updateHighWatermark(log.logEndOffset) + // simulate calls to upload 3 segments to remote storage + log.updateHighestOffsetInRemoteStorage(30) + + log.deleteOldSegments() + assertEquals(2, log.logSegments.size) + assertEquals(0, log.logStartOffset) + assertEquals(31, log.localLogStartOffset()) + } + private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short,