satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r609372137



##########
File path: 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
##########
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class RemoteLogMetadataCacheTest {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCacheTest.class);
+
+    private static final TopicIdPartition TP0 = new 
TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+    private static final int SEG_SIZE = 1024 * 1024;
+    private static final int BROKER_ID_0 = 0;
+    private static final int BROKER_ID_1 = 1;
+
+    private final Time time = new MockTime(1);
+
+    @Test
+    public void testSegmentsLifeCycleInCache() throws Exception {
+        RemoteLogMetadataCache cache = new RemoteLogMetadataCache();
+        // Create remote log segment metadata and add them to 
RemoteLogMetadataCache.
+
+        // segment 0
+        // 0-100
+        // leader epochs (0,0), (1,20), (2,80)
+        Map<Integer, Long> segment0LeaderEpochs = new HashMap<>();
+        segment0LeaderEpochs.put(0, 0L);
+        segment0LeaderEpochs.put(1, 20L);
+        segment0LeaderEpochs.put(2, 80L);
+        RemoteLogSegmentId segment0Id = new RemoteLogSegmentId(TP0, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata segment0Metadata = new 
RemoteLogSegmentMetadata(segment0Id, 0L, 100L,
+                -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, 
segment0LeaderEpochs);
+        cache.addCopyInProgressSegment(segment0Metadata);
+
+        // We should not get this as the segment is still getting copied and 
it is not yet considered successful until
+        // it reaches RemoteLogSegmentState.COPY_SEGMENT_FINISHED.
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(40, 
1).isPresent());
+
+        RemoteLogSegmentMetadataUpdate segment0Update = new 
RemoteLogSegmentMetadataUpdate(
+                segment0Id, time.milliseconds(), 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+        cache.updateRemoteLogSegmentMetadata(segment0Update);
+        RemoteLogSegmentMetadata expectedSegment0Metadata = 
segment0Metadata.createWithUpdates(segment0Update);
+
+        // segment 1
+        // 101 - 200
+        // no changes in leadership with in this segment
+        // leader epochs (2, 101)
+        Map<Integer, Long> segment1LeaderEpochs = Collections.singletonMap(2, 
101L);
+        RemoteLogSegmentMetadata segment1Metadata = 
createSegmentUpdateWithState(cache, segment1LeaderEpochs, 101L, 200L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 2
+        // 201 - 300
+        // moved to epoch 3 in between
+        // leader epochs (2, 201), (3, 240)
+        Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
+        segment2LeaderEpochs.put(2, 201L);
+        segment2LeaderEpochs.put(3, 240L);
+        RemoteLogSegmentMetadata segment2Metadata = 
createSegmentUpdateWithState(cache, segment2LeaderEpochs, 201L, 300L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        // segment 3
+        // 250 - 400
+        // leader epochs (3, 250), (4, 370)
+        Map<Integer, Long> segment3LeaderEpochs = new HashMap<>();
+        segment3LeaderEpochs.put(3, 250L);
+        segment3LeaderEpochs.put(4, 370L);
+        RemoteLogSegmentMetadata segment3Metadata = 
createSegmentUpdateWithState(cache, segment3LeaderEpochs, 250L, 400L,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        // Four segments are added with different boundaries and leader epochs.
+        // Search for cache.remoteLogSegmentMetadata(leaderEpoch, offset)  for 
different
+        // epochs and offsets
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        HashMap<EpochOffset, RemoteLogSegmentMetadata> 
expectedEpochOffsetToSegmentMetadata = new HashMap<>();
+        // Existing metadata entries.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 40), 
expectedSegment0Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(2, 110), 
segment1Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 240), 
segment2Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(3, 250), 
segment3Metadata);
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 375), 
segment3Metadata);
+
+        // Non existing metadata entries.
+        // Search for offset 110, epoch 1, and it should not exist.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(1, 110), 
null);
+        // Search for non existing offset 401, epoch 4.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(4, 401), 
null);
+        // Search for non existing epoch 5.
+        expectedEpochOffsetToSegmentMetadata.put(new EpochOffset(5, 301), 
null);
+
+        for (Map.Entry<EpochOffset, RemoteLogSegmentMetadata> entry : 
expectedEpochOffsetToSegmentMetadata.entrySet()) {
+            EpochOffset epochOffset = entry.getKey();
+            Optional<RemoteLogSegmentMetadata> segmentMetadata = 
cache.remoteLogSegmentMetadata(epochOffset.epoch, epochOffset.offset);
+            RemoteLogSegmentMetadata expectedSegmentMetadata = 
entry.getValue();
+            log.info("Searching for {} , result: {}, expected: {} ", 
epochOffset, segmentMetadata,
+                    expectedSegmentMetadata);
+            if (expectedSegmentMetadata != null) {
+                Assertions.assertEquals(Optional.of(expectedSegmentMetadata), 
segmentMetadata);
+            } else {
+                Assertions.assertFalse(segmentMetadata.isPresent());
+            }
+        }
+
+        // Update segment with state as DELETE_SEGMENT_STARTED.
+        // It should not be available when we search for that segment.
+        cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
10).isPresent());
+
+        // Update segment with state as DELETE_SEGMENT_FINISHED.
+        // It should not be available when we search for that segment.
+        cache.updateRemoteLogSegmentMetadata(new 
RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
+                time.milliseconds(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+        Assertions.assertFalse(cache.remoteLogSegmentMetadata(0, 
10).isPresent());
+
+        
//////////////////////////////////////////////////////////////////////////////////////////
+        //  Search for cache.highestLogOffset(leaderEpoch) for all the leader 
epochs
+        
//////////////////////////////////////////////////////////////////////////////////////////
+
+        Map<Integer, Long> expectedEpochToHighestOffset = new HashMap<>();
+        expectedEpochToHighestOffset.put(0, 19L);

Review comment:
       `highestLogOffset` can contain the deleted segments. `highestLogOffset` 
means the highest offset up to which the segments have been copied. Pl take a 
look at the 
[comment](https://github.com/apache/kafka/pull/10218#discussion_r609369253).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to