This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d10918c6 [FLINK-35265][snapshot] Create FlinkStateSnapshot in the 
namespace of job
d10918c6 is described below

commit d10918c626c19964635a340fd99a627691fedf54
Author: Mate Czagany <czmat...@gmail.com>
AuthorDate: Sat Aug 10 17:08:40 2024 +0200

    [FLINK-35265][snapshot] Create FlinkStateSnapshot in the namespace of job
---
 .../operator/utils/FlinkStateSnapshotUtils.java    | 16 ++++++++++++++--
 .../flink/kubernetes/operator/TestUtils.java       |  7 ++++++-
 .../utils/FlinkStateSnapshotUtilsTest.java         | 22 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
index c74776d9..42927738 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java
@@ -122,10 +122,12 @@ public class FlinkStateSnapshotUtils {
 
     protected static FlinkStateSnapshot createFlinkStateSnapshot(
             KubernetesClient kubernetesClient,
+            String namespace,
             String name,
             FlinkStateSnapshotSpec spec,
             SnapshotTriggerType triggerType) {
         var metadata = new ObjectMeta();
+        metadata.setNamespace(namespace);
         metadata.setName(name);
         metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, 
triggerType.name());
 
@@ -169,7 +171,12 @@ public class FlinkStateSnapshotUtils {
                         .build();
 
         var resourceName = getFlinkStateSnapshotName(SAVEPOINT, triggerType, 
resource);
-        return createFlinkStateSnapshot(kubernetesClient, resourceName, 
snapshotSpec, triggerType);
+        return createFlinkStateSnapshot(
+                kubernetesClient,
+                resource.getMetadata().getNamespace(),
+                resourceName,
+                snapshotSpec,
+                triggerType);
     }
 
     /**
@@ -191,7 +198,12 @@ public class FlinkStateSnapshotUtils {
                         .build();
 
         var resourceName = getFlinkStateSnapshotName(CHECKPOINT, triggerType, 
resource);
-        return createFlinkStateSnapshot(kubernetesClient, resourceName, 
snapshotSpec, triggerType);
+        return createFlinkStateSnapshot(
+                kubernetesClient,
+                resource.getMetadata().getNamespace(),
+                resourceName,
+                snapshotSpec,
+                triggerType);
     }
 
     /**
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index e450c236..81b4e7e6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -286,7 +286,12 @@ public class TestUtils extends BaseTestUtils {
     public static <CR extends AbstractFlinkResource<?, ?>>
             List<FlinkStateSnapshot> getFlinkStateSnapshotsForResource(
                     KubernetesClient kubernetesClient, CR resource) {
-        return 
kubernetesClient.resources(FlinkStateSnapshot.class).list().getItems().stream()
+        return kubernetesClient
+                .resources(FlinkStateSnapshot.class)
+                .inAnyNamespace()
+                .list()
+                .getItems()
+                .stream()
                 .filter(
                         s ->
                                 s.getSpec()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index 7f658d8d..bf37ed63 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -229,6 +229,28 @@ public class FlinkStateSnapshotUtilsTest {
                 false);
     }
 
+    @Test
+    public void testCreateSnapshotInSameNamespace() {
+        var namespace = "different-namespace";
+        var deployment = initDeployment();
+        deployment.getMetadata().setNamespace(namespace);
+
+        var savepoint =
+                FlinkStateSnapshotUtils.createSavepointResource(
+                        client,
+                        deployment,
+                        SAVEPOINT_PATH,
+                        SnapshotTriggerType.PERIODIC,
+                        SavepointFormatType.CANONICAL,
+                        true);
+        
assertThat(savepoint.getMetadata().getNamespace()).isEqualTo(namespace);
+
+        var checkpoint =
+                FlinkStateSnapshotUtils.createCheckpointResource(
+                        client, deployment, SnapshotTriggerType.MANUAL);
+        
assertThat(checkpoint.getMetadata().getNamespace()).isEqualTo(namespace);
+    }
+
     @Test
     public void testCreateCheckpointResource() {
         var deployment = initDeployment();

Reply via email to