satishd commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r610462859
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * This class represents the in-memory state of segments associated with a leader epoch. This includes the mapping of offset to + * segment ids and unreferenced segments which are not mapped to any offset but they exist in remote storage. + * <p> + * This is used by {@link RemoteLogMetadataCache} to track the segments for each leader epoch. + */ +class RemoteLogLeaderEpochState { + + // It contains offset to segment ids mapping with the segment state as COPY_SEGMENT_FINISHED. + private final NavigableMap<Long, RemoteLogSegmentId> offsetToId = new ConcurrentSkipListMap<>(); + + /** + * It represents unreferenced segments for this leader epoch. It contains the segments still in COPY_SEGMENT_STARTED + * and DELETE_SEGMENT_STARTED state or these have been replaced by callers with other segments having the same + * start offset for the leader epoch. These will be returned by {@link RemoteLogMetadataCache#listAllRemoteLogSegments()} + * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int leaderEpoch)} so that callers can clean them up if + * they still exist. These will be cleaned from the cache once they reach DELETE_SEGMENT_FINISHED state. + */ + private final Set<RemoteLogSegmentId> unreferencedSegmentIds = ConcurrentHashMap.newKeySet(); + + // It represents the highest log offset of the segments that were updated with updateHighestLogOffset. + private volatile Long highestLogOffset; + + /** + * Returns all the segments associated with this leader epoch sorted by start offset in ascending order. + * + * @param idToSegmentMetadata mapping of id to segment metadata. This will be used to get RemoteLogSegmentMetadata + * for an id to be used for sorting. + */ + Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments(Map<RemoteLogSegmentId, RemoteLogSegmentMetadata> idToSegmentMetadata) { + // Return all the segments including unreferenced metadata. + int size = offsetToId.size() + unreferencedSegmentIds.size(); + if (size == 0) { + return Collections.emptyIterator(); + } + + ArrayList<RemoteLogSegmentMetadata> metadataList = new ArrayList<>(size); + for (RemoteLogSegmentId id : offsetToId.values()) { + metadataList.add(idToSegmentMetadata.get(id)); + } + + if (!unreferencedSegmentIds.isEmpty()) { + for (RemoteLogSegmentId id : unreferencedSegmentIds) { + metadataList.add(idToSegmentMetadata.get(id)); + } + + // sort only when unreferenced entries exist as they are already sorted in offsetToId. + metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset)); + } + + return metadataList.iterator(); + } + + void handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmentId) { + // Add this to unreferenced set of segments for the respective leader epoch. + unreferencedSegmentIds.add(remoteLogSegmentId); Review comment: It already replaces the existing entry [here](https://github.com/apache/kafka/pull/10218/files#diff-3724bb53d7ab4bc5a6ec4e1ab4c91c47bf90e4166d881f7706e2adc1848a5d16R299). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org