This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new a9e60a0f398 KAFKA-16073: Increment the local-log-start-offset before 
deleting segments in memory table (#15748)
a9e60a0f398 is described below

commit a9e60a0f398e86f2e9c43f1ae570af3f8bc15ee0
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Wed Apr 24 08:39:50 2024 +0530

    KAFKA-16073: Increment the local-log-start-offset before deleting segments 
in memory table (#15748)
    
    cherry-picked from: d092787
    
    Co-authored-by: hzh0425 642256...@qq.com
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 10 ++++--
 .../LogCleanerParameterizedIntegrationTest.scala   |  3 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |  4 +++
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 40 +++++++++++++++++++---
 4 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e0734da738a..054cc668cd9 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1465,10 +1465,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           }
         }
         localLog.checkIfMemoryMappedBufferClosed()
-        // remove the segments for lookups
-        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+        if (segmentsToDelete.nonEmpty) {
+          // increment the local-log-start-offset or log-start-offset before 
removing the segment for lookups
+          val newLocalLogStartOffset = 
localLog.segments.higherSegment(segmentsToDelete.last.baseOffset).get.baseOffset
+          incrementStartOffset(newLocalLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+          // remove the segments for lookups
+          localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = 
true, reason)
+        }
         deleteProducerSnapshots(deletable, asyncDelete = true)
-        incrementStartOffset(localLog.segments.firstSegmentBaseOffset.get, 
LogStartOffsetIncrementReason.SegmentDeletion)
       }
       numToDelete
     }
diff --git 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 132a77ff97b..efd3430e126 100755
--- 
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -121,9 +121,8 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     // Set the last modified time to an old value to force deletion of old 
segments
     val endOffset = log.logEndOffset
     log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * 
retentionMs))
-    TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset,
+    TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset && 
log.numberOfSegments == 1,
       "Timed out waiting for deletion of old segments")
-    assertEquals(1, log.numberOfSegments)
 
     cleaner.shutdown()
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index d9ab92e8a4d..ab59144e25c 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -56,7 +56,9 @@ object LogTestUtils {
   def createLogConfig(segmentMs: Long = LogConfig.DEFAULT_SEGMENT_MS,
                       segmentBytes: Int = LogConfig.DEFAULT_SEGMENT_BYTES,
                       retentionMs: Long = LogConfig.DEFAULT_RETENTION_MS,
+                      localRetentionMs: Long = 
LogConfig.DEFAULT_LOCAL_RETENTION_MS,
                       retentionBytes: Long = LogConfig.DEFAULT_RETENTION_BYTES,
+                      localRetentionBytes: Long = 
LogConfig.DEFAULT_LOCAL_RETENTION_BYTES,
                       segmentJitterMs: Long = 
LogConfig.DEFAULT_SEGMENT_JITTER_MS,
                       cleanupPolicy: String = LogConfig.DEFAULT_CLEANUP_POLICY,
                       maxMessageBytes: Int = 
LogConfig.DEFAULT_MAX_MESSAGE_BYTES,
@@ -68,7 +70,9 @@ object LogTestUtils {
     logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
     logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long)
+    logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: 
java.lang.Long)
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: 
java.lang.Long)
+    logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes: java.lang.Long)
     logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, segmentJitterMs: 
java.lang.Long)
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
     logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes: 
Integer)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 90d911e0adf..e231088d7a3 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -39,6 +39,8 @@ import 
org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, 
ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.anyLong
 import org.mockito.Mockito.{mock, when}
@@ -947,8 +949,9 @@ class UnifiedLogTest {
     assertEquals(0, lastSeq)
   }
 
-  @Test
-  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+  @ParameterizedTest(name = "testRetentionDeletesProducerStateSnapshots with 
createEmptyActiveSegment: {0}")
+  @ValueSource(booleans = Array(true, false))
+  def testRetentionDeletesProducerStateSnapshots(createEmptyActiveSegment: 
Boolean): Unit = {
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
     val log = createLog(logDir, logConfig)
     val pid1 = 1L
@@ -962,10 +965,14 @@ class UnifiedLogTest {
     log.roll()
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
       producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    if (createEmptyActiveSegment) {
+      log.roll()
+    }
 
     log.updateHighWatermark(log.logEndOffset)
 
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+    val numProducerSnapshots = if (createEmptyActiveSegment) 3 else 2
+    assertEquals(numProducerSnapshots, 
ProducerStateManager.listSnapshotFiles(logDir).size)
     // Sleep to breach the retention period
     mockTime.sleep(1000 * 60 + 1)
     log.deleteOldSegments()
@@ -973,6 +980,7 @@ class UnifiedLogTest {
     mockTime.sleep(1)
     assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
       "expect a single producer state snapshot remaining")
+    assertEquals(3, log.logStartOffset)
   }
 
   @Test
@@ -3621,7 +3629,7 @@ class UnifiedLogTest {
         val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
         log.appendAsLeader(records, leaderEpoch = 0)
       }
-      
+
       log.updateHighWatermark(90L)
       log.maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.SegmentDeletion)
       assertEquals(20, log.logStartOffset)
@@ -3896,6 +3904,30 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, 
verificationGuard = verificationGuard)
   }
 
+  @Test
+  def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    var offset = 0L
+    for(_ <- 0 until 50) {
+      val records = TestUtils.singletonRecords("test".getBytes())
+      val info = log.appendAsLeader(records, leaderEpoch = 0)
+      offset = info.lastOffset
+      if (offset != 0 && offset % 10 == 0)
+        log.roll()
+    }
+    assertEquals(5, log.logSegments.size)
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 3 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(30)
+
+    log.deleteOldSegments()
+    assertEquals(2, log.logSegments.size)
+    assertEquals(0, log.logStartOffset)
+    assertEquals(31, log.localLogStartOffset())
+  }
+
   private def appendTransactionalToBuffer(buffer: ByteBuffer,
                                           producerId: Long,
                                           producerEpoch: Short,

Reply via email to