mateczagany commented on code in PR #860:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/860#discussion_r1721835078


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java:
##########
@@ -219,64 +240,205 @@ private void 
observeTriggeredCheckpoint(FlinkResourceContext<CR> ctx, String job
     }
 
     /** Clean up and dispose savepoints according to the configured max 
size/age. */
+    private void cleanupSavepointHistory(FlinkResourceContext<CR> ctx) {
+        Set<FlinkStateSnapshot> snapshots = Collections.emptySet();
+        if (FlinkStateSnapshotUtils.isSnapshotResourceEnabled(
+                ctx.getOperatorConfig(), ctx.getObserveConfig())) {
+            snapshots = 
ctx.getJosdkContext().getSecondaryResources(FlinkStateSnapshot.class);
+            if (snapshots == null) {
+                snapshots = Set.of();
+            }
+        }
+
+        cleanupSavepointHistoryLegacy(ctx, snapshots);
+
+        if (CollectionUtil.isNullOrEmpty(snapshots)) {
+            return;
+        }
+        if (ctx.getObserveConfig().get(OPERATOR_SAVEPOINT_CLEANUP_ENABLED)) {
+            var savepointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), SAVEPOINT);
+            var checkpointsToDelete =
+                    getFlinkStateSnapshotsToCleanUp(
+                            snapshots, ctx.getObserveConfig(), 
ctx.getOperatorConfig(), CHECKPOINT);
+            Stream.concat(savepointsToDelete.stream(), 
checkpointsToDelete.stream())
+                    .forEach(
+                            snapshot ->
+                                    ctx.getKubernetesClient()
+                                            .resource(snapshot)
+                                            .withTimeoutInMillis(0L)
+                                            .delete());
+        }
+    }
+
+    /**
+     * Returns a list of FlinkStateSnapshot resources that should be cleaned 
up based on age/count
+     * policies.
+     *
+     * @param snapshots list of all snapshots
+     * @param observeConfig observe config
+     * @param operatorConfig operator config
+     * @param snapshotType checkpoint or savepoint
+     * @return set of FlinkStateSnapshot resources to delete
+     */
     @VisibleForTesting
-    void cleanupSavepointHistory(FlinkResourceContext<CR> ctx, SavepointInfo 
currentSavepointInfo) {
+    Set<FlinkStateSnapshot> getFlinkStateSnapshotsToCleanUp(
+            Collection<FlinkStateSnapshot> snapshots,
+            Configuration observeConfig,
+            FlinkOperatorConfiguration operatorConfig,
+            SnapshotType snapshotType) {
+        var snapshotList =
+                snapshots.stream()
+                        .filter(
+                                s ->
+                                        
CLEAN_UP_SNAPSHOT_TRIGGER_TYPES.contains(
+                                                
FlinkStateSnapshotUtils.getSnapshotTriggerType(s)))
+                        .filter(s -> (s.getSpec().isSavepoint() == 
(snapshotType == SAVEPOINT)))
+                        .sorted(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .collect(Collectors.toList());
+
+        var lastCompleteSnapshot =
+                snapshotList.stream()
+                        .filter(s -> 
COMPLETED.equals(s.getStatus().getState()))
+                        .max(Comparator.comparing(EXTRACT_SNAPSHOT_TIME))
+                        .orElse(null);
+
+        var maxCount = getMaxCountForSnapshotType(observeConfig, 
operatorConfig, snapshotType);
+        var maxTms = getMinAgeForSnapshotType(observeConfig, operatorConfig, 
snapshotType);
+        var result = new HashSet<FlinkStateSnapshot>();
+
+        if (snapshotList.size() < 2) {
+            return result;
+        }
+
+        for (var snapshot : snapshotList) {
+            if (snapshot.equals(lastCompleteSnapshot)) {
+                continue;
+            }
+
+            var ts = EXTRACT_SNAPSHOT_TIME.apply(snapshot).toEpochMilli();
+            if (snapshotList.size() - result.size() > maxCount || ts < maxTms) 
{
+                result.add(snapshot);
+            }

Review Comment:
   Good catch. I have added a check and test for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to