This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fade3d10ea0 KAFKA-15047: Roll active segment when it breaches the 
retention policy (#14766)
fade3d10ea0 is described below

commit fade3d10ea07eea5d6076b8fb1b68e2db5ffec48
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Nov 28 09:38:11 2023 +0530

    KAFKA-15047: Roll active segment when it breaches the retention policy 
(#14766)
    
    Roll the active segment and offload it to remote storage once it breaches 
the retention time policy.
    
    A segment is eligible for deletion once it gets uploaded to the remote 
storage. We have checks to allow only the passive segments to be uploaded, so 
the active segment never gets removed at all even if breaches the retention 
time. For low-throughput/stale topics, the active segment can hold the data 
beyond the configured retention time by the user.
    
    Reviewers: Satish Duggana <[email protected]>, Christo Lolov 
<[email protected]>
---
 core/src/main/scala/kafka/log/LocalLog.scala       |  34 +-----
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  70 +++++++++---
 .../test/scala/unit/kafka/log/LocalLogTest.scala   |  68 ------------
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   4 +
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 117 ++++++++++++++++++++-
 .../tiered/storage/actions/ProduceAction.java      |   9 +-
 .../RollAndOffloadActiveSegmentTest.java           |  75 +++++++++++++
 7 files changed, 261 insertions(+), 116 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
index 27b89864ffc..92758d01144 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -32,7 +32,7 @@ import java.util
 import java.util.concurrent.atomic.AtomicLong
 import java.util.regex.Pattern
 import java.util.{Collections, Optional}
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable}
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -257,36 +257,6 @@ class LocalLog(@volatile private var _dir: File,
     }
   }
 
-  /**
-   * Find segments starting from the oldest until the user-supplied predicate 
is false.
-   * A final segment that is empty will never be returned.
-   *
-   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
-   *                  (if there is one). It returns true iff the segment is 
deletable.
-   * @return the segments ready to be deleted
-   */
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-    if (segments.isEmpty) {
-      Seq.empty
-    } else {
-      val deletable = ArrayBuffer.empty[LogSegment]
-      val segmentsIterator = segments.values.iterator
-      var segmentOpt = nextOption(segmentsIterator)
-      while (segmentOpt.isDefined) {
-        val segment = segmentOpt.get
-        val nextSegmentOpt = nextOption(segmentsIterator)
-        val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
-        if (predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
-          deletable += segment
-          segmentOpt = nextSegmentOpt
-        } else {
-          segmentOpt = Option.empty
-        }
-      }
-      deletable
-    }
-  }
-
   /**
    * This method deletes the given log segments by doing the following for 
each of them:
    * - It removes the segment from the segment map so that it will no longer 
be used for reads.
@@ -982,7 +952,7 @@ object LocalLog extends Logging {
    * @tparam T the type of object held within the iterator
    * @return Some(iterator.next) if a next element exists, None otherwise.
    */
-  private def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
+  private[log] def nextOption[T](iterator: util.Iterator[T]): Option[T] = {
     if (iterator.hasNext)
       Some(iterator.next())
     else
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 5cd5014c502..0691cff51d0 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -19,6 +19,7 @@ package kafka.log
 
 import com.yammer.metrics.core.MetricName
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
+import kafka.log.LocalLog.nextOption
 import kafka.log.remote.RemoteLogManager
 import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, RequestLocal}
 import kafka.utils._
@@ -49,7 +50,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.stream.Collectors
 import java.util.{Collections, Optional, OptionalInt, OptionalLong}
 import scala.annotation.nowarn
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Seq, immutable, mutable}
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -1424,18 +1425,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
                                 reason: SegmentDeletionReason): Int = {
-    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-      // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-      (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-        // We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-        // offset can never exceed it.
-        highWatermark >= upperBoundOffset &&
-        predicate(segment, nextSegmentOpt)
-    }
     lock synchronized {
-      val deletable = localLog.deletableSegments(shouldDelete)
+      val deletable = deletableSegments(predicate)
       if (deletable.nonEmpty)
         deleteSegments(deletable, reason)
       else
@@ -1443,6 +1434,61 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *                  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+    def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+      // Segments are eligible for deletion when:
+      //    1. they are uploaded to the remote storage
+      if (remoteLogEnabled()) {
+        upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+      } else {
+        true
+      }
+    }
+
+    if (localLog.segments.isEmpty) {
+      Seq.empty
+    } else {
+      val deletable = ArrayBuffer.empty[LogSegment]
+      val segmentsIterator = localLog.segments.values.iterator
+      var segmentOpt = nextOption(segmentsIterator)
+      var shouldRoll = false
+      while (segmentOpt.isDefined) {
+        val segment = segmentOpt.get
+        val nextSegmentOpt = nextOption(segmentsIterator)
+        val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+        val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+        // We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+        // offset can never exceed it.
+        val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+        // Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+        // eligible for deletion and gets removed in the next iteration.
+        if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && 
segment.size > 0) {
+          shouldRoll = true
+        }
+        if (predicateResult && !isLastSegmentAndEmpty && 
isSegmentEligibleForDeletion(upperBoundOffset)) {
+          deletable += segment
+          segmentOpt = nextSegmentOpt
+        } else {
+          segmentOpt = Option.empty
+        }
+      }
+      if (shouldRoll) {
+        info("Rolling the active segment to make it eligible for deletion")
+        roll()
+      }
+      deletable
+    }
+  }
+
   private def incrementStartOffset(startOffset: Long, reason: 
LogStartOffsetIncrementReason): Unit = {
     if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset, 
reason)
     else maybeIncrementLogStartOffset(startOffset, reason)
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala 
b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
index 924b8920bee..29b5fd34f90 100644
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
@@ -330,74 +330,6 @@ class LocalLogTest {
     testDeleteSegmentFiles(asyncDelete = true)
   }
 
-  @Test
-  def testDeletableSegmentsFilter(): Unit = {
-    for (offset <- 0 to 8) {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = offset)
-      log.roll()
-    }
-
-    assertEquals(10, log.segments.numberOfSegments)
-
-    {
-      val deletable = log.deletableSegments(
-        (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
-      val expected = 
log.segments.nonActiveLogSegmentsFrom(0L).asScala.filter(segment => 
segment.baseOffset <= 5).toList
-      assertEquals(6, expected.length)
-      assertEquals(expected, deletable.toList)
-    }
-
-    {
-      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
-      val expected = log.segments.nonActiveLogSegmentsFrom(0L).asScala.toList
-      assertEquals(9, expected.length)
-      assertEquals(expected, deletable.toList)
-    }
-
-    {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = 9L)
-      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
-      val expected = log.segments.values.asScala.toList
-      assertEquals(10, expected.length)
-      assertEquals(expected, deletable.toList)
-    }
-  }
-
-  @Test
-  def testDeletableSegmentsIteration(): Unit = {
-    for (offset <- 0 to 8) {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = offset)
-      log.roll()
-    }
-
-    assertEquals(10L, log.segments.numberOfSegments)
-
-    var offset = 0
-    val deletableSegments = log.deletableSegments(
-      (segment: LogSegment, nextSegmentOpt: Option[LogSegment]) => {
-        assertEquals(offset, segment.baseOffset)
-        val floorSegmentOpt = log.segments.floorSegment(offset)
-        assertTrue(floorSegmentOpt.isPresent)
-        assertEquals(floorSegmentOpt.get, segment)
-        if (offset == log.logEndOffset) {
-          assertFalse(nextSegmentOpt.isDefined)
-        } else {
-          assertTrue(nextSegmentOpt.isDefined)
-          val higherSegmentOpt = log.segments.higherSegment(segment.baseOffset)
-          assertTrue(higherSegmentOpt.isPresent)
-          assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
-          assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
-        }
-        offset += 1
-        true
-      })
-    assertEquals(10L, log.segments.numberOfSegments)
-    assertEquals(log.segments.nonActiveLogSegmentsFrom(0L).asScala.toSeq, 
deletableSegments.toSeq)
-  }
-
   @Test
   def testCreateAndDeleteSegment(): Unit = {
     val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index ad47da05a00..c3f630e7646 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -56,7 +56,9 @@ object LogTestUtils {
   def createLogConfig(segmentMs: Long = LogConfig.DEFAULT_SEGMENT_MS,
                       segmentBytes: Int = LogConfig.DEFAULT_SEGMENT_BYTES,
                       retentionMs: Long = LogConfig.DEFAULT_RETENTION_MS,
+                      localRetentionMs: Long = 
LogConfig.DEFAULT_LOCAL_RETENTION_MS,
                       retentionBytes: Long = LogConfig.DEFAULT_RETENTION_BYTES,
+                      localRetentionBytes: Long = 
LogConfig.DEFAULT_LOCAL_RETENTION_BYTES,
                       segmentJitterMs: Long = 
LogConfig.DEFAULT_SEGMENT_JITTER_MS,
                       cleanupPolicy: String = LogConfig.DEFAULT_CLEANUP_POLICY,
                       maxMessageBytes: Int = 
LogConfig.DEFAULT_MAX_MESSAGE_BYTES,
@@ -68,7 +70,9 @@ object LogTestUtils {
     logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
     logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: java.lang.Long)
+    logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs: 
java.lang.Long)
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes: 
java.lang.Long)
+    logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes: java.lang.Long)
     logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, segmentJitterMs: 
java.lang.Long)
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
     logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes: 
Integer)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 86bcf6c878b..9f9acaf33c7 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, 
LogStartOffsetIncrementReason, ProducerStateManager, 
ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, ProducerStateManager, 
ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
@@ -3911,6 +3911,121 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, 
verificationGuard = verificationGuard)
   }
 
+  @Test
+  def testDeletableSegmentsFilter(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    for (_ <- 0 to 8) {
+      val records = TestUtils.records(List(
+        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
+      ))
+      log.appendAsLeader(records, leaderEpoch = 0)
+      log.roll()
+    }
+    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+
+    assertEquals(10, log.logSegments.size())
+
+    {
+      val deletable = log.deletableSegments(
+        (segment: LogSegment, _: Option[LogSegment]) => segment.baseOffset <= 
5)
+      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.filter(segment 
=> segment.baseOffset <= 5).toList
+      assertEquals(6, expected.length)
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
+      val expected = log.nonActiveLogSegmentsFrom(0L).asScala.toList
+      assertEquals(9, expected.length)
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val records = TestUtils.records(List(
+        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
+      ))
+      log.appendAsLeader(records, leaderEpoch = 0)
+      log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment]) => true)
+      val expected = log.logSegments.asScala.toList
+      assertEquals(10, expected.length)
+      assertEquals(expected, deletable.toList)
+    }
+  }
+
+  @Test
+  def testDeletableSegmentsIteration(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    for (_ <- 0 to 8) {
+      val records = TestUtils.records(List(
+        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
+      ))
+      log.appendAsLeader(records, leaderEpoch = 0)
+      log.roll()
+    }
+    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
+
+    assertEquals(10, log.logSegments.size())
+
+    var offset = 0
+    val deletableSegments = log.deletableSegments(
+      (segment: LogSegment, nextSegmentOpt: Option[LogSegment]) => {
+        assertEquals(offset, segment.baseOffset)
+        val logSegments = new LogSegments(log.topicPartition)
+        log.logSegments.forEach(segment => logSegments.add(segment))
+        val floorSegmentOpt = logSegments.floorSegment(offset)
+        assertTrue(floorSegmentOpt.isPresent)
+        assertEquals(floorSegmentOpt.get, segment)
+        if (offset == log.logEndOffset) {
+          assertFalse(nextSegmentOpt.isDefined)
+        } else {
+          assertTrue(nextSegmentOpt.isDefined)
+          val higherSegmentOpt = logSegments.higherSegment(segment.baseOffset)
+          assertTrue(higherSegmentOpt.isPresent)
+          assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
+          assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
+        }
+        offset += 1
+        true
+      })
+    assertEquals(10L, log.logSegments.size())
+    assertEquals(log.nonActiveLogSegmentsFrom(0L).asScala.toSeq, 
deletableSegments.toSeq)
+  }
+
+  @Test
+  def testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage(): 
Unit = {
+    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
segmentIndexBytes = 12,
+      retentionMs = 3, localRetentionMs = 1, fileDeleteDelayMs = 0, 
remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    // Append 1 message to the active segment
+    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes))),
+      leaderEpoch = 0)
+    // Update the highWatermark so that these segments will be eligible for 
deletion.
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(1, log.logSegments.size)
+    assertEquals(0, log.activeSegment.baseOffset())
+
+    mockTime.sleep(2)
+    // It should have rolled the active segment as they are eligible for 
deletion
+    log.deleteOldSegments()
+    assertEquals(2, log.logSegments.size)
+    log.logSegments.asScala.zipWithIndex.foreach {
+      case (segment, idx) => assertEquals(idx, segment.baseOffset)
+    }
+
+    // Once rolled, the segment should be uploaded to remote storage and 
eligible for deletion
+    log.updateHighestOffsetInRemoteStorage(1)
+    log.deleteOldSegments()
+    assertEquals(1, log.logSegments.size)
+    assertEquals(1, log.logSegments.asScala.head.baseOffset())
+    assertEquals(1, log.localLogStartOffset())
+    assertEquals(1, log.logEndOffset)
+    assertEquals(0, log.logStartOffset)
+  }
+
   private def appendTransactionalToBuffer(buffer: ByteBuffer,
                                           producerId: Long,
                                           producerEpoch: Short,
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
index 344d8c4afbd..c697284171e 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
@@ -45,9 +45,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public final class ProduceAction implements TieredStorageTestAction {
 
-    // How much time to wait for all remote log segments of a topic-partition 
to be offloaded
-    // to the second-tier storage.
-    private static final int OFFLOAD_WAIT_TIMEOUT_SEC = 20;
+    /**
+     * How much time to wait for all remote log segments of a topic-partition 
to be offloaded to the second-tier storage.
+     * This timeout should exceed the {@link 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils#STORAGE_WAIT_TIMEOUT_SEC}
+     * so that the test can verify that the active segment gets rolled and 
offloaded to the remote storage.
+     */
+    private static final int OFFLOAD_WAIT_TIMEOUT_SEC = 40;
 
     private final TopicPartition topicPartition;
     private final List<OffloadedSegmentSpec> offloadedSegmentSpecs;
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
new file mode 100644
index 00000000000..8dd80772b85
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test to verify that the active segment is rolled and uploaded to remote 
storage when the segment breaches the
+ * local log retention policy.
+ */
+public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 1;
+        final Integer maxBatchCountPerSegment = 1;
+        final Map<Integer, List<Integer>> replicaAssignment = null;
+        final boolean enableRemoteLogStorage = true;
+
+        // Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
+        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+                        enableRemoteLogStorage)
+                // update the topic config such that it triggers the rolling 
of the active segment
+                .updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())
+                // produce events to partition 0 and expect all the 4 segments 
to be offloaded
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
+                // consume from the beginning of the topic to read data from 
local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 4)
+                .consume(topicA, p0, 0L, 4, 4);
+    }
+
+    private Map<String, String> configsToBeAdded() {
+        // Update localLog retentionMs to 1 ms and segment roll-time to 10 ms
+        Map<String, String> topicConfigs = new HashMap<>();
+        topicConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1");
+        return topicConfigs;
+    }
+}

Reply via email to