This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 946ab8f4100 KAFKA-15410: Delete records with tiered storage 
integration test (4/4) (#14330)
946ab8f4100 is described below

commit 946ab8f4100e4f923648d1d38eca2d1640b53f79
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Thu Sep 7 21:02:39 2023 +0530

    KAFKA-15410: Delete records with tiered storage integration test (4/4) 
(#14330)
    
    * Added the integration test for DELETE_RECORDS API for tiered storage 
enabled topic
    * Added validation checks before removing remote log segments for 
log-start-offset breach
    
    Reviewers: Satish Duggana <sati...@apache.org>, Luke Chen 
<show...@gmail.com>, Christo Lolov <lol...@amazon.com>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 108 +++++++++++++--------
 .../tiered/storage/TieredStorageTestBuilder.java   |   8 +-
 .../storage/actions/DeleteRecordsAction.java       |  39 +++++++-
 ...eleteSegmentsDueToLogStartOffsetBreachTest.java |  92 ++++++++++++++++++
 4 files changed, 202 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 9b70cfbe568..4a35abf6a11 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -837,10 +837,10 @@ public class RemoteLogManager implements Closeable {
                     return false;
                 }
 
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, 
ignored -> {
                     // Assumption that segments contain size >= 0
                     if (remainingBreachedSize > 0) {
-                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        long remainingBytes = remainingBreachedSize - 
metadata.segmentSizeInBytes();
                         if (remainingBytes >= 0) {
                             remainingBreachedSize = remainingBytes;
                             return true;
@@ -864,7 +864,7 @@ public class RemoteLogManager implements Closeable {
                 }
 
                 boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
-                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                        ignored -> metadata.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
                 if (isSegmentDeleted) {
                     remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
                     // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
@@ -876,27 +876,40 @@ public class RemoteLogManager implements Closeable {
                 return isSegmentDeleted;
             }
 
-            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
+                                                                 long 
logStartOffset,
+                                                                 
NavigableMap<Integer, Long> leaderEpochEntries)
                     throws RemoteStorageException, ExecutionException, 
InterruptedException {
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
-                if (isSegmentDeleted && retentionSizeData.isPresent()) {
-                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
-                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, 
ignored -> {
+                    if (!leaderEpochEntries.isEmpty()) {
+                        // Note that `logStartOffset` and 
`leaderEpochEntries.firstEntry().getValue()` should be same
+                        Integer firstEpoch = leaderEpochEntries.firstKey();
+                        return 
metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= 
firstEpoch)
+                                && metadata.endOffset() < logStartOffset;
+                    }
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to 
log-start-offset {} breach. " +
+                            "Current earliest-epoch-entry: {}, 
segment-end-offset: {} and segment-epochs: {}",
+                            metadata.remoteLogSegmentId(), logStartOffset, 
leaderEpochEntries.firstEntry(),
+                            metadata.endOffset(), 
metadata.segmentLeaderEpochs());
                 }
-
                 return isSegmentDeleted;
             }
 
             // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
             // unreferenced because they are not part of the current leader 
epoch lineage.
-            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
-                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
-                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
+                                                                             
RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, 
ignored ->
+                        
metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
                 if (isSegmentDeleted) {
-                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                    logger.info("Deleted remote log segment {} due to 
leader-epoch-cache truncation. " +
+                                    "Current earliest-epoch-entry: {}, 
segment-end-offset: {} and segment-epochs: {}",
                             metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
                 }
-
                 // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
                 return isSegmentDeleted;
             }
@@ -904,7 +917,7 @@ public class RemoteLogManager implements Closeable {
             private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
                     throws RemoteStorageException, ExecutionException, 
InterruptedException {
                 if (predicate.test(segmentMetadata)) {
-                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    logger.debug("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
                     // Publish delete segment started event.
                     remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
                             new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
@@ -917,10 +930,9 @@ public class RemoteLogManager implements Closeable {
                     remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
                             new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
                                     segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
-                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    logger.debug("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
                     return true;
                 }
-
                 return false;
             }
 
@@ -967,7 +979,6 @@ public class RemoteLogManager implements Closeable {
             LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
             // Build the leader epoch map by filtering the epochs that do not 
have any records.
             NavigableMap<Integer, Long> epochWithOffsets = 
buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
-            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
 
             long logStartOffset = log.logStartOffset();
             long logEndOffset = log.logEndOffset();
@@ -977,24 +988,35 @@ public class RemoteLogManager implements Closeable {
 
             RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
             Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
-            boolean isSegmentDeleted = true;
-            while (isSegmentDeleted && epochIterator.hasNext()) {
+            boolean canProcess = true;
+            while (canProcess && epochIterator.hasNext()) {
                 Integer epoch = epochIterator.next();
                 Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
-                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                while (canProcess && segmentsIterator.hasNext()) {
                     if (isCancelled() || !isLeader()) {
                         logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
                         return;
                     }
                     RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
 
-                    // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
-                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
logEndOffset, epochWithOffsets)) {
-                        isSegmentDeleted =
-                                
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
-                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
-                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
logStartOffset);
+                    // When the log-start-offset is moved by the user, the 
leader-epoch-checkpoint file gets truncated
+                    // as per the log-start-offset. Until the 
rlm-cleaner-thread runs in the next iteration, those
+                    // remote log segments won't be removed. The 
`isRemoteSegmentWithinLeaderEpoch` validates whether
+                    // the epochs present in the segment lies in the 
checkpoint file. It will always return false
+                    // since the checkpoint file was already truncated.
+                    boolean isSegmentDeleted = 
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
+                            metadata, logStartOffset, epochWithOffsets);
+                    boolean isValidSegment = false;
+                    if (!isSegmentDeleted) {
+                        // check whether the segment contains the required 
epoch range with in the current leader epoch lineage.
+                        isValidSegment = 
isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
+                        if (isValidSegment) {
+                            isSegmentDeleted =
+                                    
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                            
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
+                        }
                     }
+                    canProcess = isSegmentDeleted || !isValidSegment;
                 }
             }
 
@@ -1002,9 +1024,12 @@ public class RemoteLogManager implements Closeable {
             // to the leader. This will remove the unreferenced segments in 
the remote storage. This is needed for
             // unclean leader election scenarios as the remote storage can 
have epochs earlier to the current leader's
             // earliest leader epoch.
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
             if (earliestEpochEntryOptional.isPresent()) {
                 EpochEntry earliestEpochEntry = 
earliestEpochEntryOptional.get();
-                Iterator<Integer> epochsToClean = 
remoteLeaderEpochs.stream().filter(x -> x < 
earliestEpochEntry.epoch).iterator();
+                Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream()
+                        .filter(remoteEpoch -> remoteEpoch < 
earliestEpochEntry.epoch)
+                        .iterator();
                 while (epochsToClean.hasNext()) {
                     int epoch = epochsToClean.next();
                     Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
@@ -1093,8 +1118,9 @@ public class RemoteLogManager implements Closeable {
         Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
         Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
         if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
-            LOGGER.debug("[{}] Remote segment {} is not within the partition 
leader epoch lineage. Remote segment epochs: {} and partition leader epochs: 
{}",
-                    segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
+            LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+                            "Remote segment epochs: {} and partition leader 
epochs: {}",
+                    segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
             return false;
         }
 
@@ -1104,15 +1130,16 @@ public class RemoteLogManager implements Closeable {
 
             // If segment's epoch does not exist in the leader epoch lineage 
then it is not a valid segment.
             if (!leaderEpochs.containsKey(epoch)) {
-                LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+                LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +
+                                "Remote segment epochs: {} and partition 
leader epochs: {}",
+                        segmentMetadata.remoteLogSegmentId(), epoch, 
segmentLeaderEpochs, leaderEpochs);
                 return false;
             }
 
             // Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
             if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
-                LOGGER.debug("[{}]  Remote segment {}'s first epoch {}'s 
offset is less than leader epoch's offset {}.",
-                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
+                LOGGER.debug("Segment {} first epoch {} offset is less than 
leader epoch offset {}.",
+                        segmentMetadata.remoteLogSegmentId(), epoch, 
leaderEpochs.get(epoch));
                 return false;
             }
 
@@ -1120,8 +1147,8 @@ public class RemoteLogManager implements Closeable {
             if (epoch == segmentLastEpoch) {
                 Map.Entry<Integer, Long> nextEntry = 
leaderEpochs.higherEntry(epoch);
                 if (nextEntry != null && segmentEndOffset > 
nextEntry.getValue() - 1) {
-                    LOGGER.debug("[{}]  Remote segment {}'s end offset {} is 
more than leader epoch's offset {}.",
-                            segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 
1);
+                    LOGGER.debug("Segment {} end offset {} is more than leader 
epoch offset {}.",
+                            segmentMetadata.remoteLogSegmentId(), 
segmentEndOffset, nextEntry.getValue() - 1);
                     return false;
                 }
             }
@@ -1129,13 +1156,12 @@ public class RemoteLogManager implements Closeable {
             // Next segment epoch entry and next leader epoch entry should be 
same to ensure that the segment's epoch
             // is within the leader epoch lineage.
             if (epoch != segmentLastEpoch && 
!leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch)))
 {
-                LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
-                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+                LOGGER.debug("Segment {} epoch {} is not within the leader 
epoch lineage. " +
+                                "Remote segment epochs: {} and partition 
leader epochs: {}",
+                        segmentMetadata.remoteLogSegmentId(), epoch, 
segmentLeaderEpochs, leaderEpochs);
                 return false;
             }
-
         }
-
         // segment end offset should be with in the log end offset.
         return segmentEndOffset < logEndOffset;
     }
@@ -1286,7 +1312,7 @@ public class RemoteLogManager implements Closeable {
 
         OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
         long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
-                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+                .map(position -> 
position.offset).orElse(segmentMetadata.endOffset() + 1);
 
         final Set<FetchResponseData.AbortedTransaction> abortedTransactions = 
new HashSet<>();
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index ff489002b38..5c28c33183f 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -323,7 +323,7 @@ public final class TieredStorageTestBuilder {
                                                   Integer partition,
                                                   Long beforeOffset) {
         TopicPartition topicPartition = new TopicPartition(topic, partition);
-        actions.add(new DeleteRecordsAction(topicPartition, beforeOffset));
+        actions.add(new DeleteRecordsAction(topicPartition, beforeOffset, 
buildDeleteSegmentSpecList(topic)));
         return this;
     }
 
@@ -377,6 +377,10 @@ public final class TieredStorageTestBuilder {
 
     private DeleteTopicAction buildDeleteTopicAction(String topic,
                                                      Boolean shouldDelete) {
+        return new DeleteTopicAction(topic, buildDeleteSegmentSpecList(topic), 
shouldDelete);
+    }
+
+    private List<RemoteDeleteSegmentSpec> buildDeleteSegmentSpecList(String 
topic) {
         List<RemoteDeleteSegmentSpec> deleteSegmentSpecList = 
deletables.entrySet()
                 .stream()
                 .filter(e -> e.getKey().topic().equals(topic))
@@ -389,7 +393,7 @@ public final class TieredStorageTestBuilder {
                 })
                 .collect(Collectors.toList());
         deleteSegmentSpecList.forEach(spec -> 
deletables.remove(spec.getTopicPartition()));
-        return new DeleteTopicAction(topic, deleteSegmentSpecList, 
shouldDelete);
+        return deleteSegmentSpecList;
     }
 }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/DeleteRecordsAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/DeleteRecordsAction.java
index 0f81d35a052..0f6a756c048 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/DeleteRecordsAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/DeleteRecordsAction.java
@@ -16,36 +16,71 @@
  */
 package org.apache.kafka.tiered.storage.actions;
 
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition;
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
 
 import java.io.PrintStream;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
 
 public final class DeleteRecordsAction implements TieredStorageTestAction {
 
+    private static final int DELETE_WAIT_TIMEOUT_SEC = 10;
     private final TopicPartition partition;
     private final Long beforeOffset;
+    private final List<RemoteDeleteSegmentSpec> deleteSegmentSpecs;
 
     public DeleteRecordsAction(TopicPartition partition,
-                               Long beforeOffset) {
+                               Long beforeOffset,
+                               List<RemoteDeleteSegmentSpec> 
deleteSegmentSpecs) {
         this.partition = partition;
         this.beforeOffset = beforeOffset;
+        this.deleteSegmentSpecs = deleteSegmentSpecs;
     }
 
     @Override
-    public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+    public void doExecute(TieredStorageTestContext context)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        List<LocalTieredStorage> tieredStorages = 
context.remoteStorageManagers();
+        List<LocalTieredStorageCondition> tieredStorageConditions = 
deleteSegmentSpecs.stream()
+                .filter(spec -> spec.getEventType() == DELETE_SEGMENT)
+                .map(spec -> expectEvent(
+                        tieredStorages,
+                        spec.getEventType(),
+                        spec.getSourceBrokerId(),
+                        spec.getTopicPartition(),
+                        false,
+                        spec.getEventCount()))
+                .collect(Collectors.toList());
+
         Map<TopicPartition, RecordsToDelete> recordsToDeleteMap =
                 Collections.singletonMap(partition, 
RecordsToDelete.beforeOffset(beforeOffset));
         context.admin().deleteRecords(recordsToDeleteMap).all().get();
+
+        if (!tieredStorageConditions.isEmpty()) {
+            tieredStorageConditions.stream()
+                    .reduce(LocalTieredStorageCondition::and)
+                    .get()
+                    .waitUntilTrue(DELETE_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS);
+        }
     }
 
     @Override
     public void describe(PrintStream output) {
         output.printf("delete-records partition: %s, before-offset: %d%n", 
partition, beforeOffset);
+        deleteSegmentSpecs.forEach(spec -> output.println("    " + spec));
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
new file mode 100644
index 00000000000..5f10df5c68c
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class DeleteSegmentsDueToLogStartOffsetBreachTest extends 
TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 2;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final Integer broker1 = 1;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 2;
+        final Integer maxBatchCountPerSegment = 2;
+        final Map<Integer, List<Integer>> replicaAssignment = 
mkMap(mkEntry(p0, Arrays.asList(broker0, broker1)));
+        final boolean enableRemoteLogStorage = true;
+        final int beginEpoch = 0;
+        final long startOffset = 3;
+        final long beforeOffset = 3L;
+        final long beforeOffset1 = 7L;
+
+        // Create topicA with 1 partition and 2 RF
+        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+                        enableRemoteLogStorage)
+                // produce events to partition 0 and expect 2 segments to be 
offloaded
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"),
+                        new KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"),
+                        new KeyValueSpec("k3", "v3"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"), new KeyValueSpec("k4", "v4"))
+                // Use DELETE_RECORDS API to delete the records upto offset 3 
and expect one remote segment to be deleted
+                .expectDeletionInRemoteStorage(broker0, topicA, p0, 
DELETE_SEGMENT, 1)
+                .deleteRecords(topicA, p0, beforeOffset)
+                // expect that the leader epoch checkpoint is updated
+                // Comment out this line if it's FLAKY since the leader-epoch 
is not deterministic in ZK mode.
+                .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch, 
startOffset)
+                // consume from the offset-3 of the topic to read data from 
local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 1)
+                .consume(topicA, p0, 3L, 2, 1)
+
+                // switch leader to change the leader-epoch from 0 to 1
+                .expectLeader(topicA, p0, broker1, true)
+                // produce some more messages and move the log-start-offset 
such that earliest-epoch changes from 0 to 1
+                .expectSegmentToBeOffloaded(broker1, topicA, p0, 4, new 
KeyValueSpec("k4", "v4"),
+                        new KeyValueSpec("k5", "v5"))
+                .expectSegmentToBeOffloaded(broker1, topicA, p0, 6, new 
KeyValueSpec("k6", "v6"),
+                        new KeyValueSpec("k7", "v7"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 8L)
+                .produce(topicA, p0, new KeyValueSpec("k5", "v5"), new 
KeyValueSpec("k6", "v6"),
+                        new KeyValueSpec("k7", "v7"), new KeyValueSpec("k8", 
"v8"), new KeyValueSpec("k9", "v9"))
+                // Use DELETE_RECORDS API to delete the records upto offset 7 
and expect 2 remote segments to be deleted
+                .expectDeletionInRemoteStorage(broker1, topicA, p0, 
DELETE_SEGMENT, 2)
+                .deleteRecords(topicA, p0, beforeOffset1)
+                // consume from the topic with fetch-offset 7 to read data 
from local and remote storage
+                .expectFetchFromTieredStorage(broker1, topicA, p0, 1)
+                .consume(topicA, p0, 7L, 3, 1);
+    }
+}

Reply via email to