mateczagany commented on code in PR #860: URL: https://github.com/apache/flink-kubernetes-operator/pull/860#discussion_r1721835078
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java: ########## @@ -219,64 +240,205 @@ private void observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job } /** Clean up and dispose savepoints according to the configured max size/age. */ + private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) { + Set<FlinkStateSnapshot> snapshots = Collections.emptySet(); + if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled( + ctx.getOperatorConfig(), ctx.getObserveConfig())) { + snapshots = ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class); + if (snapshots == null) { + snapshots = Set.of(); + } + } + + cleanupSavepointHistoryLegacy(ctx, snapshots); + + if (CollectionUtil.isNullOrEmpty(snapshots)) { + return; + } + if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) { + var savepointsToDelete = + getFlinkStateSnapshotsToCleanUp( + snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), SAVEPOINT); + var checkpointsToDelete = + getFlinkStateSnapshotsToCleanUp( + snapshots, ctx.getObserveConfig(), ctx.getOperatorConfig(), CHECKPOINT); + Stream.concat(savepointsToDelete.stream(), checkpointsToDelete.stream()) + .forEach( + snapshot -> + ctx.getKubernetesClient() + .resource(snapshot) + .withTimeoutInMillis(0L) + .delete()); + } + } + + /** + * Returns a list of FlinkStateSnapshot resources that should be cleaned up based on age/count + * policies. + * + * @param snapshots list of all snapshots + * @param observeConfig observe config + * @param operatorConfig operator config + * @param snapshotType checkpoint or savepoint + * @return set of FlinkStateSnapshot resources to delete + */ @VisibleForTesting - void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo currentSavepointInfo) { + Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp( + Collection<FlinkStateSnapshot> snapshots, + Configuration observeConfig, + FlinkOperatorConfiguration operatorConfig, + SnapshotType snapshotType) { + var snapshotList = + snapshots.stream() + .filter( + s -> + CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains( + FlinkStateSnapshotUtils.getSnapshotTriggerType(s))) + .filter(s -> (s.getSpec().isSavepoint() == (snapshotType == SAVEPOINT))) + .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)) + .collect(Collectors.toList()); + + var lastCompleteSnapshot = + snapshotList.stream() + .filter(s -> COMPLETED.equals(s.getStatus().getState())) + .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME)) + .orElse(null); + + var maxCount = getMaxCountForSnapshotType(observeConfig, operatorConfig, snapshotType); + var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, snapshotType); + var result = new HashSet<FlinkStateSnapshot>(); + + if (snapshotList.size() < 2) { + return result; + } + + for (var snapshot : snapshotList) { + if (snapshot.equals(lastCompleteSnapshot)) { + continue; + } + + var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli(); + if (snapshotList.size() - result.size() > maxCount || ts < maxTms) { + result.add(snapshot); + } Review Comment: Good catch. I have added a check and test for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org