satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r719511498
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java ########## @@ -161,77 +161,73 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada throw new IllegalArgumentException("metadataUpdate: " + metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED + " can not be updated"); case COPY_SEGMENT_FINISHED: - handleSegmentWithCopySegmentFinishedState(metadataUpdate, existingMetadata); + handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate)); break; case DELETE_SEGMENT_STARTED: - handleSegmentWithDeleteSegmentStartedState(metadataUpdate, existingMetadata); + handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate)); break; case DELETE_SEGMENT_FINISHED: - handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, existingMetadata); + handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate)); break; default: throw new IllegalArgumentException("Metadata with the state " + targetState + " is not supported"); } } - private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate metadataUpdate, - RemoteLogSegmentMetadata existingMetadata) { - log.debug("Adding remote log segment metadata to leader epoch mappings with update: [{}]", metadataUpdate); - - doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata, - RemoteLogLeaderEpochState::handleSegmentWithCopySegmentFinishedState); + protected final void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { + doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata, + (leaderEpoch, remoteLogLeaderEpochState, startOffset, segmentId) -> { + long leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch, + remoteLogSegmentMetadata); + remoteLogLeaderEpochState.handleSegmentWithCopySegmentFinishedState(startOffset, + segmentId, + leaderEpochEndOffset); + }); // Put the entry with the updated metadata. - idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(), - existingMetadata.createWithUpdates(metadataUpdate)); + idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), remoteLogSegmentMetadata); } - private void handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadataUpdate metadataUpdate, - RemoteLogSegmentMetadata existingMetadata) { - log.debug("Cleaning up the state for : [{}]", metadataUpdate); + protected final void handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { + log.debug("Cleaning up the state for : [{}]", remoteLogSegmentMetadata); - doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata, - RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentStartedState); + doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata, + (leaderEpoch, remoteLogLeaderEpochState, startOffset, segmentId) -> + remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset, segmentId)); // Put the entry with the updated metadata. - idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(), - existingMetadata.createWithUpdates(metadataUpdate)); + idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), remoteLogSegmentMetadata); Review comment: `remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset, segmentId)` call in the earlier statement removes the entries. Let me know if I am missing something here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org