satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r719511498
##########
File path:
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##########
@@ -161,77 +161,73 @@ public void
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada
throw new IllegalArgumentException("metadataUpdate: " +
metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
" can not be updated");
case COPY_SEGMENT_FINISHED:
- handleSegmentWithCopySegmentFinishedState(metadataUpdate,
existingMetadata);
+
handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
break;
case DELETE_SEGMENT_STARTED:
- handleSegmentWithDeleteSegmentStartedState(metadataUpdate,
existingMetadata);
+
handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
break;
case DELETE_SEGMENT_FINISHED:
- handleSegmentWithDeleteSegmentFinishedState(metadataUpdate,
existingMetadata);
+
handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
break;
default:
throw new IllegalArgumentException("Metadata with the state "
+ targetState + " is not supported");
}
}
- private void
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate
metadataUpdate,
-
RemoteLogSegmentMetadata existingMetadata) {
- log.debug("Adding remote log segment metadata to leader epoch mappings
with update: [{}]", metadataUpdate);
-
- doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-
RemoteLogLeaderEpochState::handleSegmentWithCopySegmentFinishedState);
+ protected final void
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+ (leaderEpoch,
remoteLogLeaderEpochState, startOffset, segmentId) -> {
+ long
leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch,
+
remoteLogSegmentMetadata);
+
remoteLogLeaderEpochState.handleSegmentWithCopySegmentFinishedState(startOffset,
+
segmentId,
+
leaderEpochEndOffset);
+ });
// Put the entry with the updated metadata.
- idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
- existingMetadata.createWithUpdates(metadataUpdate));
+ idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(),
remoteLogSegmentMetadata);
}
- private void
handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadataUpdate
metadataUpdate,
-
RemoteLogSegmentMetadata existingMetadata) {
- log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+ protected final void
handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ log.debug("Cleaning up the state for : [{}]",
remoteLogSegmentMetadata);
- doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-
RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentStartedState);
+ doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+ (leaderEpoch,
remoteLogLeaderEpochState, startOffset, segmentId) ->
+
remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset,
segmentId));
// Put the entry with the updated metadata.
- idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
- existingMetadata.createWithUpdates(metadataUpdate));
+ idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(),
remoteLogSegmentMetadata);
Review comment:
`remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset,
segmentId)` call in the earlier statement removes the entries. Let me know if
I am missing something here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]