satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610462859



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * This class represents the in-memory state of segments associated with a 
leader epoch. This includes the mapping of offset to
+ * segment ids and unreferenced segments which are not mapped to any offset 
but they exist in remote storage.
+ * <p>
+ * This is used by {@link RemoteLogMetadataCache} to track the segments for 
each leader epoch.
+ */
+class RemoteLogLeaderEpochState {
+
+    // It contains offset to segment ids mapping with the segment state as 
COPY_SEGMENT_FINISHED.
+    private final NavigableMap<Long, RemoteLogSegmentId> offsetToId = new 
ConcurrentSkipListMap<>();
+
+    /**
+     * It represents unreferenced segments for this leader epoch. It contains 
the segments still in COPY_SEGMENT_STARTED
+     * and DELETE_SEGMENT_STARTED state or these have been replaced by callers 
with other segments having the same
+     * start offset for the leader epoch. These will be returned by {@link 
RemoteLogMetadataCache#listAllRemoteLogSegments()}
+     * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int 
leaderEpoch)} so that callers can clean them up if
+     * they still exist. These will be cleaned from the cache once they reach 
DELETE_SEGMENT_FINISHED state.
+     */
+    private final Set<RemoteLogSegmentId> unreferencedSegmentIds = 
ConcurrentHashMap.newKeySet();
+
+    // It represents the highest log offset of the segments that were updated 
with updateHighestLogOffset.
+    private volatile Long highestLogOffset;
+
+    /**
+     * Returns all the segments associated with this leader epoch sorted by 
start offset in ascending order.
+     *
+     * @param idToSegmentMetadata mapping of id to segment metadata. This will 
be used to get RemoteLogSegmentMetadata
+     *                            for an id to be used for sorting.
+     */
+    Iterator<RemoteLogSegmentMetadata> 
listAllRemoteLogSegments(Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata) {
+        // Return all the segments including unreferenced metadata.
+        int size = offsetToId.size() + unreferencedSegmentIds.size();
+        if (size == 0) {
+            return Collections.emptyIterator();
+        }
+
+        ArrayList<RemoteLogSegmentMetadata> metadataList = new 
ArrayList<>(size);
+        for (RemoteLogSegmentId id : offsetToId.values()) {
+            metadataList.add(idToSegmentMetadata.get(id));
+        }
+
+        if (!unreferencedSegmentIds.isEmpty()) {
+            for (RemoteLogSegmentId id : unreferencedSegmentIds) {
+                metadataList.add(idToSegmentMetadata.get(id));
+            }
+
+            // sort only when unreferenced entries exist as they are already 
sorted in offsetToId.
+            
metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset));
+        }
+
+        return metadataList.iterator();
+    }
+
+    void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId 
remoteLogSegmentId) {
+        // Add this to unreferenced set of segments for the respective leader 
epoch.
+        unreferencedSegmentIds.add(remoteLogSegmentId);

Review comment:
       It already replaces the existing entry 
[here](https://github.com/apache/kafka/pull/10218/files#diff-3724bb53d7ab4bc5a6ec4e1ab4c91c47bf90e4166d881f7706e2adc1848a5d16R299).
 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to