Zakelly commented on code in PR #27157:
URL: https://github.com/apache/flink/pull/27157#discussion_r2476657951
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java:
##########
@@ -269,6 +269,26 @@ protected PreviousSnapshot(@Nullable
Collection<HandleAndLocalPath> confirmedSst
: Collections.emptyMap();
}
+ /**
+ * Remove the sst files which have been re-uploaded in the following
checkpoint from the
+ * confirmed sst files.
+ *
+ * @param followingUploadedSstFiles the sst files uploaded in the
following checkpoint.
+ */
+ protected void removeReUploadedConfirmedSstFiles(
+ @Nonnull Collection<HandleAndLocalPath>
followingUploadedSstFiles) {
+ if (!confirmedSstFiles.isEmpty()) {
+ followingUploadedSstFiles.forEach(
+ e -> {
+ if (!(e.getHandle() instanceof
PlaceholderStreamStateHandle)) {
+ // If it's not a placeholder handle, it means
the sst file has been
+ // re-uploaded in the following checkpoint.
+ confirmedSstFiles.remove(e.getLocalPath());
Review Comment:
Sure. I wrapped all the prune logic into `PreviousSnapshot` to make it more
clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]