This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new d10918c6 [FLINK-35265][snapshot] Create FlinkStateSnapshot in the namespace of job d10918c6 is described below commit d10918c626c19964635a340fd99a627691fedf54 Author: Mate Czagany <czmat...@gmail.com> AuthorDate: Sat Aug 10 17:08:40 2024 +0200 [FLINK-35265][snapshot] Create FlinkStateSnapshot in the namespace of job --- .../operator/utils/FlinkStateSnapshotUtils.java | 16 ++++++++++++++-- .../flink/kubernetes/operator/TestUtils.java | 7 ++++++- .../utils/FlinkStateSnapshotUtilsTest.java | 22 ++++++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java index c74776d9..42927738 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java @@ -122,10 +122,12 @@ public class FlinkStateSnapshotUtils { protected static FlinkStateSnapshot createFlinkStateSnapshot( KubernetesClient kubernetesClient, + String namespace, String name, FlinkStateSnapshotSpec spec, SnapshotTriggerType triggerType) { var metadata = new ObjectMeta(); + metadata.setNamespace(namespace); metadata.setName(name); metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name()); @@ -169,7 +171,12 @@ public class FlinkStateSnapshotUtils { .build(); var resourceName = getFlinkStateSnapshotName(SAVEPOINT, triggerType, resource); - return createFlinkStateSnapshot(kubernetesClient, resourceName, snapshotSpec, triggerType); + return createFlinkStateSnapshot( + kubernetesClient, + resource.getMetadata().getNamespace(), + resourceName, + snapshotSpec, + triggerType); } /** @@ -191,7 +198,12 @@ public class FlinkStateSnapshotUtils { .build(); var resourceName = getFlinkStateSnapshotName(CHECKPOINT, triggerType, resource); - return createFlinkStateSnapshot(kubernetesClient, resourceName, snapshotSpec, triggerType); + return createFlinkStateSnapshot( + kubernetesClient, + resource.getMetadata().getNamespace(), + resourceName, + snapshotSpec, + triggerType); } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index e450c236..81b4e7e6 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -286,7 +286,12 @@ public class TestUtils extends BaseTestUtils { public static <CR extends AbstractFlinkResource<?, ?>> List<FlinkStateSnapshot> getFlinkStateSnapshotsForResource( KubernetesClient kubernetesClient, CR resource) { - return kubernetesClient.resources(FlinkStateSnapshot.class).list().getItems().stream() + return kubernetesClient + .resources(FlinkStateSnapshot.class) + .inAnyNamespace() + .list() + .getItems() + .stream() .filter( s -> s.getSpec() diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java index 7f658d8d..bf37ed63 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java @@ -229,6 +229,28 @@ public class FlinkStateSnapshotUtilsTest { false); } + @Test + public void testCreateSnapshotInSameNamespace() { + var namespace = "different-namespace"; + var deployment = initDeployment(); + deployment.getMetadata().setNamespace(namespace); + + var savepoint = + FlinkStateSnapshotUtils.createSavepointResource( + client, + deployment, + SAVEPOINT_PATH, + SnapshotTriggerType.PERIODIC, + SavepointFormatType.CANONICAL, + true); + assertThat(savepoint.getMetadata().getNamespace()).isEqualTo(namespace); + + var checkpoint = + FlinkStateSnapshotUtils.createCheckpointResource( + client, deployment, SnapshotTriggerType.MANUAL); + assertThat(checkpoint.getMetadata().getNamespace()).isEqualTo(namespace); + } + @Test public void testCreateCheckpointResource() { var deployment = initDeployment();