This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 04065a0a [FLINK-39439] Fix savepoint history entry removed from status
before dispose succeeds
04065a0a is described below
commit 04065a0a3c16d485799fb05e96c8b0f0f3ff9b47
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Apr 15 13:56:57 2026 +0200
[FLINK-39439] Fix savepoint history entry removed from status before
dispose succeeds
---
.../operator/observer/SnapshotObserver.java | 28 +++--
.../observer/SnapshotObserverLegacyTest.java | 131 +++++++++++++++++++++
2 files changed, 150 insertions(+), 9 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
index 8f079415..3dc7eeb2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java
@@ -368,10 +368,14 @@ public class SnapshotObserver<
var lastSavepoint = savepointHistory.get(savepointHistory.size() - 1);
while (savepointHistory.size() > maxCount) {
- // remove oldest entries
- var sp = savepointHistory.remove(0);
- if (savepointCleanupEnabled) {
- disposeSavepointQuietly(ctx, sp.getLocation());
+ // Remove oldest entry only after successful dispose to avoid
orphaning files.
+ // Break on failure — entries are ordered oldest-first so we
cannot skip this one
+ // and remove a newer entry instead, as that would leave the
oldest orphaned.
+ var sp = savepointHistory.get(0);
+ if (!savepointCleanupEnabled || disposeSavepointQuietly(ctx,
sp.getLocation())) {
+ savepointHistory.remove(0);
+ } else {
+ break;
}
}
@@ -382,21 +386,27 @@ public class SnapshotObserver<
continue;
}
if (sp.getTimeStamp() < maxTms) {
- it.remove();
- if (savepointCleanupEnabled) {
- disposeSavepointQuietly(ctx, sp.getLocation());
+ // Remove entry only after successful dispose to avoid
orphaning files.
+ // Each entry is independent here so we continue on failure
rather than breaking.
+ if (!savepointCleanupEnabled || disposeSavepointQuietly(ctx,
sp.getLocation())) {
+ it.remove();
}
}
}
}
- private void disposeSavepointQuietly(FlinkResourceContext<CR> ctx, String
path) {
+ private boolean disposeSavepointQuietly(FlinkResourceContext<CR> ctx,
String path) {
try {
LOG.info("Disposing savepoint {}", path);
ctx.getFlinkService().disposeSavepoint(path,
ctx.getObserveConfig());
+ return true;
} catch (Exception e) {
// savepoint dispose error should not affect the deployment
- LOG.error("Exception while disposing savepoint {}", path, e);
+ LOG.error(
+ "Exception while disposing savepoint {}, will retry on
next reconcile",
+ path,
+ e);
+ return false;
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
index 0eee4712..0f299274 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
@@ -318,6 +318,137 @@ public class SnapshotObserverLegacyTest extends
OperatorTestBase {
Collections.emptyList(), flinkService.getDisposedSavepoints());
}
+ @Test
+ public void testCountBasedDisposeRetainsEntryOnFailure() {
+ var deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
+ Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
+
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
2);
+ configManager.updateDefaultConfig(conf);
+
+ var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
+ long futureTs = System.currentTimeMillis() * 2;
+ var sp0 =
+ new Savepoint(
+ futureTs,
+ "sp0",
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ 0L);
+ var sp1 =
+ new Savepoint(
+ futureTs + 1,
+ "sp1",
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ 1L);
+ var sp2 =
+ new Savepoint(
+ futureTs + 2,
+ "sp2",
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ 2L);
+ spInfo.updateLastSavepoint(sp0);
+ spInfo.updateLastSavepoint(sp1);
+ spInfo.updateLastSavepoint(sp2);
+
+ flinkService.setDisposeSavepointFailure(true);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
+
+ // sp0 must still be in history because dispose failed — removing it
would orphan the files
+ assertThat(spInfo.getSavepointHistory()).containsExactly(sp0, sp1,
sp2);
+ assertThat(flinkService.getDisposedSavepoints()).isEmpty();
+ }
+
+ @Test
+ public void testAgeBasedDisposeRetainsEntryOnFailure() {
+ var deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
+ Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
+ conf.set(
+
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(5));
+ configManager.updateDefaultConfig(conf);
+
+ var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
+ var sp1 =
+ new Savepoint(
+ 1, "sp1", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 1L);
+ var sp2 =
+ new Savepoint(
+ 2, "sp2", SnapshotTriggerType.MANUAL,
SavepointFormatType.CANONICAL, 2L);
+ spInfo.updateLastSavepoint(sp1);
+ spInfo.updateLastSavepoint(sp2);
+
+ flinkService.setDisposeSavepointFailure(true);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
+
+ // sp1 must still be in history because dispose failed — removing it
would orphan the files
+ assertThat(spInfo.getSavepointHistory()).containsExactly(sp1, sp2);
+ assertThat(flinkService.getDisposedSavepoints()).isEmpty();
+ }
+
+ @Test
+ public void testDisposeRetryOnSubsequentReconcile() {
+ var deployment = TestUtils.buildApplicationCluster();
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
+ Configuration conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED,
false);
+
conf.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
2);
+ configManager.updateDefaultConfig(conf);
+
+ var spInfo = deployment.getStatus().getJobStatus().getSavepointInfo();
+ long futureTs = System.currentTimeMillis() * 2;
+ var sp0 =
+ new Savepoint(
+ futureTs,
+ "sp0",
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ 0L);
+ var sp1 =
+ new Savepoint(
+ futureTs + 1,
+ "sp1",
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ 1L);
+ var sp2 =
+ new Savepoint(
+ futureTs + 2,
+ "sp2",
+ SnapshotTriggerType.MANUAL,
+ SavepointFormatType.CANONICAL,
+ 2L);
+ spInfo.updateLastSavepoint(sp0);
+ spInfo.updateLastSavepoint(sp1);
+ spInfo.updateLastSavepoint(sp2);
+
+ // First reconcile: dispose fails (job is down), entry must be retained
+ flinkService.setDisposeSavepointFailure(true);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
+ assertThat(spInfo.getSavepointHistory()).containsExactly(sp0, sp1,
sp2);
+ assertThat(flinkService.getDisposedSavepoints()).isEmpty();
+
+ // Second reconcile: dispose succeeds, entry must now be removed
+ flinkService.setDisposeSavepointFailure(false);
+ observer.cleanupSavepointHistoryLegacy(getResourceContext(deployment),
Set.of());
+ assertThat(spInfo.getSavepointHistory()).containsExactly(sp1, sp2);
+
assertThat(flinkService.getDisposedSavepoints()).containsExactly(sp0.getLocation());
+ }
+
@Test
public void testPeriodicSavepoint() throws Exception {
var conf = new Configuration();