This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 04065a0a [FLINK-39439] Fix savepoint history entry removed from status 
before dispose succeeds
04065a0a is described below

commit 04065a0a3c16d485799fb05e96c8b0f0f3ff9b47
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Apr 15 13:56:57 2026 +0200

    [FLINK-39439] Fix savepoint history entry removed from status before 
dispose succeeds
---
 .../operator/observer/SnapshotObserver.java        |  28 +++--
 .../observer/SnapshotObserverLegacyTest.java       | 131 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 9 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 8f079415..3dc7eeb2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -368,10 +368,14 @@ public class SnapshotObserver<
         var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);
 
         while (savepointHistory.size() > maxCount) {
-            // remove oldest entries
-            var sp = savepointHistory.remove(0);
-            if (savepointCleanupEnabled) {
-                disposeSavepointQuietly(ctx, sp.getLocation());
+            // Remove oldest entry only after successful dispose to avoid 
orphaning files.
+            // Break on failure — entries are ordered oldest-first so we 
cannot skip this one
+            // and remove a newer entry instead, as that would leave the 
oldest orphaned.
+            var sp = savepointHistory.get(0);
+            if (!savepointCleanupEnabled || disposeSavepointQuietly(ctx, 
sp.getLocation())) {
+                savepointHistory.remove(0);
+            } else {
+                break;
             }
         }
 
@@ -382,21 +386,27 @@ public class SnapshotObserver<
                 continue;
             }
             if (sp.getTimeStamp() < maxTms) {
-                it.remove();
-                if (savepointCleanupEnabled) {
-                    disposeSavepointQuietly(ctx, sp.getLocation());
+                // Remove entry only after successful dispose to avoid 
orphaning files.
+                // Each entry is independent here so we continue on failure 
rather than breaking.
+                if (!savepointCleanupEnabled || disposeSavepointQuietly(ctx, 
sp.getLocation())) {
+                    it.remove();
                 }
             }
         }
     }
 
-    private void disposeSavepointQuietly(FlinkResourceContext<CR> ctx, String 
path) {
+    private boolean disposeSavepointQuietly(FlinkResourceContext<CR> ctx, 
String path) {
         try {
             LOG.info("Disposing savepoint {}", path);
             ctx.getFlinkService().disposeSavepoint(path, 
ctx.getObserveConfig());
+            return true;
         } catch (Exception e) {
             // savepoint dispose error should not affect the deployment
-            LOG.error("Exception while disposing savepoint {}", path, e);
+            LOG.error(
+                    "Exception while disposing savepoint {}, will retry on 
next reconcile",
+                    path,
+                    e);
+            return false;
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
index 0eee4712..0f299274 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
@@ -318,6 +318,137 @@ public class SnapshotObserverLegacyTest extends 
OperatorTestBase {
                 Collections.emptyList(), flinkService.getDisposedSavepoints());
     }
 
+    @Test
+    public void testCountBasedDisposeRetainsEntryOnFailure() {
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
+        Configuration conf = new Configuration();
+        conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED, 
false);
+        
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 
2);
+        configManager.updateDefaultConfig(conf);
+
+        var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
+        long futureTs = System.currentTimeMillis() * 2;
+        var sp0 =
+                new Savepoint(
+                        futureTs,
+                        "sp0",
+                        SnapshotTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        0L);
+        var sp1 =
+                new Savepoint(
+                        futureTs + 1,
+                        "sp1",
+                        SnapshotTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        1L);
+        var sp2 =
+                new Savepoint(
+                        futureTs + 2,
+                        "sp2",
+                        SnapshotTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        2L);
+        spInfo.updateLastSavepoint(sp0);
+        spInfo.updateLastSavepoint(sp1);
+        spInfo.updateLastSavepoint(sp2);
+
+        flinkService.setDisposeSavepointFailure(true);
+        observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), 
Set.of());
+
+        // sp0 must still be in history because dispose failed — removing it 
would orphan the files
+        assertThat(spInfo.getSavepointHistory()).containsExactly(sp0, sp1, 
sp2);
+        assertThat(flinkService.getDisposedSavepoints()).isEmpty();
+    }
+
+    @Test
+    public void testAgeBasedDisposeRetainsEntryOnFailure() {
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
+        Configuration conf = new Configuration();
+        conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED, 
false);
+        conf.set(
+                
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                Duration.ofMillis(5));
+        configManager.updateDefaultConfig(conf);
+
+        var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
+        var sp1 =
+                new Savepoint(
+                        1, "sp1", SnapshotTriggerType.MANUAL, 
SavepointFormatType.CANONICAL, 1L);
+        var sp2 =
+                new Savepoint(
+                        2, "sp2", SnapshotTriggerType.MANUAL, 
SavepointFormatType.CANONICAL, 2L);
+        spInfo.updateLastSavepoint(sp1);
+        spInfo.updateLastSavepoint(sp2);
+
+        flinkService.setDisposeSavepointFailure(true);
+        observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), 
Set.of());
+
+        // sp1 must still be in history because dispose failed — removing it 
would orphan the files
+        assertThat(spInfo.getSavepointHistory()).containsExactly(sp1, sp2);
+        assertThat(flinkService.getDisposedSavepoints()).isEmpty();
+    }
+
+    @Test
+    public void testDisposeRetryOnSubsequentReconcile() {
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
+        Configuration conf = new Configuration();
+        conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED, 
false);
+        
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 
2);
+        configManager.updateDefaultConfig(conf);
+
+        var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
+        long futureTs = System.currentTimeMillis() * 2;
+        var sp0 =
+                new Savepoint(
+                        futureTs,
+                        "sp0",
+                        SnapshotTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        0L);
+        var sp1 =
+                new Savepoint(
+                        futureTs + 1,
+                        "sp1",
+                        SnapshotTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        1L);
+        var sp2 =
+                new Savepoint(
+                        futureTs + 2,
+                        "sp2",
+                        SnapshotTriggerType.MANUAL,
+                        SavepointFormatType.CANONICAL,
+                        2L);
+        spInfo.updateLastSavepoint(sp0);
+        spInfo.updateLastSavepoint(sp1);
+        spInfo.updateLastSavepoint(sp2);
+
+        // First reconcile: dispose fails (job is down), entry must be retained
+        flinkService.setDisposeSavepointFailure(true);
+        observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), 
Set.of());
+        assertThat(spInfo.getSavepointHistory()).containsExactly(sp0, sp1, 
sp2);
+        assertThat(flinkService.getDisposedSavepoints()).isEmpty();
+
+        // Second reconcile: dispose succeeds, entry must now be removed
+        flinkService.setDisposeSavepointFailure(false);
+        observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment), 
Set.of());
+        assertThat(spInfo.getSavepointHistory()).containsExactly(sp1, sp2);
+        
assertThat(flinkService.getDisposedSavepoints()).containsExactly(sp0.getLocation());
+    }
+
     @Test
     public void testPeriodicSavepoint() throws Exception {
         var conf = new Configuration();

Reply via email to