This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 2642b35  [FLINK-26660] Specify eventsource name when watching multiple 
namespaces
2642b35 is described below

commit 2642b35ee1b3976556fc22284b97990f36f49e85
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Tue Mar 15 17:53:47 2022 +0100

    [FLINK-26660] Specify eventsource name when watching multiple namespaces
---
 .../flink/kubernetes/operator/FlinkOperator.java   |  3 +-
 .../config/FlinkOperatorConfiguration.java         |  9 ++-
 .../operator/controller/FlinkControllerConfig.java | 19 ++---
 .../kubernetes/operator/observer/BaseObserver.java | 10 ++-
 .../kubernetes/operator/utils/OperatorUtils.java   | 20 ++++-
 .../controller/FlinkDeploymentControllerTest.java  | 28 ++-----
 .../operator/observer/SessionObserverTest.java     | 94 ++++++++++++++++++++++
 7 files changed, 145 insertions(+), 38 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index f3a00e4..d0bc217 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -74,7 +74,8 @@ public class FlinkOperator {
                         reconcilerFactory,
                         observerFactory);
 
-        FlinkControllerConfig controllerConfig = new 
FlinkControllerConfig(controller);
+        FlinkControllerConfig controllerConfig =
+                new FlinkControllerConfig(controller, 
operatorConfiguration.getWatchedNamespaces());
         controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index ea90ec5..6e25075 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -20,9 +20,12 @@ package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 
 import lombok.Value;
 
+import java.util.Set;
+
 /** Configuration class for operator. */
 @Value
 public class FlinkOperatorConfiguration {
@@ -32,6 +35,7 @@ public class FlinkOperatorConfiguration {
     int restApiReadyDelaySeconds;
     int savepointTriggerGracePeriodSeconds;
     String flinkServiceHostOverride;
+    Set<String> watchedNamespaces;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         int reconcileIntervalSeconds =
@@ -57,11 +61,14 @@ public class FlinkOperatorConfiguration {
             flinkServiceHostOverride = "localhost";
         }
 
+        Set<String> watchedNamespaces = OperatorUtils.getWatchedNamespaces();
+
         return new FlinkOperatorConfiguration(
                 reconcileIntervalSeconds,
                 progressCheckIntervalSeconds,
                 restApiReadyDelaySeconds,
                 savepointTriggerGracePeriodSeconds,
-                flinkServiceHostOverride);
+                flinkServiceHostOverride,
+                watchedNamespaces);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
index ba8176a..49b2b27 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
@@ -18,33 +18,24 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 
 import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration;
-import org.apache.commons.lang3.StringUtils;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
 /** Custom config for {@link FlinkDeploymentController}. */
 public class FlinkControllerConfig extends 
AnnotationConfiguration<FlinkDeployment> {
 
-    private static final String NAMESPACES_SPLITTER_KEY = ",";
+    private final Set<String> watchedNamespaces;
 
-    public FlinkControllerConfig(FlinkDeploymentController reconciler) {
+    public FlinkControllerConfig(
+            FlinkDeploymentController reconciler, Set<String> 
watchedNamespaces) {
         super(reconciler);
+        this.watchedNamespaces = watchedNamespaces;
     }
 
     @Override
     public Set<String> getNamespaces() {
-        String watchedNamespaces = 
EnvUtils.get(EnvUtils.ENV_WATCHED_NAMESPACES);
-
-        if (StringUtils.isEmpty(watchedNamespaces)) {
-            return Collections.emptySet();
-        } else {
-            return new 
HashSet<>(Arrays.asList(watchedNamespaces.split(NAMESPACES_SPLITTER_KEY)));
-        }
+        return watchedNamespaces;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
index 2dee79e..180cb6f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -66,7 +66,7 @@ public abstract class BaseObserver implements Observer {
             return;
         }
 
-        Optional<Deployment> deployment = 
context.getSecondaryResource(Deployment.class);
+        Optional<Deployment> deployment = getSecondaryResource(flinkApp, 
context);
         if (deployment.isPresent()) {
             DeploymentStatus status = deployment.get().getStatus();
             DeploymentSpec spec = deployment.get().getSpec();
@@ -111,6 +111,14 @@ public abstract class BaseObserver implements Observer {
         
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
     }
 
+    private Optional<Deployment> getSecondaryResource(FlinkDeployment 
flinkApp, Context context) {
+        return context.getSecondaryResource(
+                Deployment.class,
+                operatorConfiguration.getWatchedNamespaces().size() > 1
+                        ? flinkApp.getMetadata().getNamespace()
+                        : null);
+    }
+
     protected boolean isClusterReady(FlinkDeployment dep) {
         return dep.getStatus().getJobManagerDeploymentStatus() == 
JobManagerDeploymentStatus.READY;
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
index 48e55ee..28dc8fc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
@@ -28,10 +28,18 @@ import 
io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
 import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
 import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /** Operator SDK related utility functions. */
 public class OperatorUtils {
 
+    private static final String NAMESPACES_SPLITTER_KEY = ",";
+
     public static InformerEventSource<Deployment, HasMetadata> 
createJmDepInformerEventSource(
             KubernetesClient kubernetesClient, String namespace) {
         return createJmDepInformerEventSource(
@@ -57,8 +65,18 @@ public class OperatorUtils {
         return new InformerEventSource<>(informer, 
Mappers.fromLabel(Constants.LABEL_APP_KEY)) {
             @Override
             public String name() {
-                return super.name() + "-" + name;
+                return name;
             }
         };
     }
+
+    public static Set<String> getWatchedNamespaces() {
+        String watchedNamespaces = 
EnvUtils.get(EnvUtils.ENV_WATCHED_NAMESPACES);
+
+        if (StringUtils.isEmpty(watchedNamespaces)) {
+            return Collections.emptySet();
+        } else {
+            return new 
HashSet<>(Arrays.asList(watchedNamespaces.split(NAMESPACES_SPLITTER_KEY)));
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index d9ff90b..9f011de 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test;
 
 import java.net.HttpURLConnection;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -66,7 +67,7 @@ public class FlinkDeploymentControllerTest {
 
     private final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
     private final FlinkOperatorConfiguration operatorConfiguration =
-            new FlinkOperatorConfiguration(1, 2, 3, 4, null);
+            new FlinkOperatorConfiguration(1, 2, 3, 4, null, 
Collections.emptySet());
 
     private TestingFlinkService flinkService;
     private FlinkDeploymentController testController;
@@ -387,33 +388,19 @@ public class FlinkDeploymentControllerTest {
     public void testPrepareEventSource() {
         // Test watch all
         testController.setControllerConfig(
-                new FlinkControllerConfig(testController) {
-                    @Override
-                    public Set<String> getEffectiveNamespaces() {
-                        return Set.of();
-                    }
-                });
+                new FlinkControllerConfig(testController, 
Collections.emptySet()));
         List<EventSource> eventSources = 
testController.prepareEventSources(null);
         assertEquals(1, eventSources.size());
-        assertTrue(eventSources.get(0).name().endsWith("-all"));
+        assertEquals("all", eventSources.get(0).name());
 
         // Test watch namespaces
         Set<String> namespaces = Set.of("ns1", "ns2", "ns3");
-        testController.setControllerConfig(
-                new FlinkControllerConfig(testController) {
-                    @Override
-                    public Set<String> getEffectiveNamespaces() {
-                        return namespaces;
-                    }
-                });
+        testController.setControllerConfig(new 
FlinkControllerConfig(testController, namespaces));
         eventSources = testController.prepareEventSources(null);
         assertEquals(3, eventSources.size());
         assertEquals(
                 namespaces,
-                eventSources.stream()
-                        .map(EventSource::name)
-                        .map(s -> s.substring(s.length() - 3))
-                        .collect(Collectors.toSet()));
+                
eventSources.stream().map(EventSource::name).collect(Collectors.toSet()));
     }
 
     private FlinkDeploymentController createTestController(
@@ -429,7 +416,8 @@ public class FlinkDeploymentControllerTest {
                         new ReconcilerFactory(
                                 kubernetesClient, flinkService, 
operatorConfiguration),
                         new ObserverFactory(flinkService, 
operatorConfiguration));
-        controller.setControllerConfig(new FlinkControllerConfig(controller));
+        controller.setControllerConfig(
+                new FlinkControllerConfig(controller, Collections.emptySet()));
         return controller;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
index 6e7cb48..1721671 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -26,10 +26,20 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /** {@link SessionObserver} unit tests. */
 public class SessionObserverTest {
@@ -66,4 +76,88 @@ public class SessionObserverTest {
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
     }
+
+    @Test
+    public void testWatchMultipleNamespaces() {
+        FlinkService flinkService = new TestingFlinkService();
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
+
+        FlinkOperatorConfiguration allNsConfig =
+                new FlinkOperatorConfiguration(1, 2, 3, 4, null, 
Collections.emptySet());
+        FlinkOperatorConfiguration specificNsConfig =
+                new FlinkOperatorConfiguration(
+                        1, 2, 3, 4, null, 
Set.of(deployment.getMetadata().getNamespace()));
+        FlinkOperatorConfiguration multipleNsConfig =
+                new FlinkOperatorConfiguration(
+                        1, 2, 3, 4, null, 
Set.of(deployment.getMetadata().getNamespace(), "ns"));
+
+        Deployment k8sDeployment = new Deployment();
+        k8sDeployment.setSpec(new DeploymentSpec());
+        k8sDeployment.setStatus(new DeploymentStatus());
+
+        AtomicInteger secondaryResourceAccessed = new AtomicInteger(0);
+        Observer allNsObserver = new SessionObserver(flinkService, 
allNsConfig);
+        allNsObserver.observe(
+                deployment,
+                new Context() {
+                    @Override
+                    public Optional<RetryInfo> getRetryInfo() {
+                        return Optional.empty();
+                    }
+
+                    @Override
+                    public <T> Optional<T> getSecondaryResource(Class<T> 
aClass, String s) {
+                        assertNull(s);
+                        secondaryResourceAccessed.addAndGet(1);
+                        return Optional.of((T) k8sDeployment);
+                    }
+                },
+                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
+
+        assertEquals(1, secondaryResourceAccessed.get());
+
+        Observer specificNsObserver = new SessionObserver(flinkService, 
specificNsConfig);
+        specificNsObserver.observe(
+                deployment,
+                new Context() {
+                    @Override
+                    public Optional<RetryInfo> getRetryInfo() {
+                        return Optional.empty();
+                    }
+
+                    @Override
+                    public <T> Optional<T> getSecondaryResource(Class<T> 
aClass, String s) {
+                        assertNull(s);
+                        secondaryResourceAccessed.addAndGet(1);
+                        return Optional.of((T) k8sDeployment);
+                    }
+                },
+                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
+
+        assertEquals(2, secondaryResourceAccessed.get());
+
+        Observer multipleNsObserver = new SessionObserver(flinkService, 
multipleNsConfig);
+        multipleNsObserver.observe(
+                deployment,
+                new Context() {
+                    @Override
+                    public Optional<RetryInfo> getRetryInfo() {
+                        return Optional.empty();
+                    }
+
+                    @Override
+                    public <T> Optional<T> getSecondaryResource(Class<T> 
aClass, String s) {
+                        assertEquals(deployment.getMetadata().getNamespace(), 
s);
+                        secondaryResourceAccessed.addAndGet(1);
+                        return Optional.of((T) k8sDeployment);
+                    }
+                },
+                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
+
+        assertEquals(3, secondaryResourceAccessed.get());
+    }
 }

Reply via email to