This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 2642b35 [FLINK-26660] Specify eventsource name when watching multiple namespaces 2642b35 is described below commit 2642b35ee1b3976556fc22284b97990f36f49e85 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Tue Mar 15 17:53:47 2022 +0100 [FLINK-26660] Specify eventsource name when watching multiple namespaces --- .../flink/kubernetes/operator/FlinkOperator.java | 3 +- .../config/FlinkOperatorConfiguration.java | 9 ++- .../operator/controller/FlinkControllerConfig.java | 19 ++--- .../kubernetes/operator/observer/BaseObserver.java | 10 ++- .../kubernetes/operator/utils/OperatorUtils.java | 20 ++++- .../controller/FlinkDeploymentControllerTest.java | 28 ++----- .../operator/observer/SessionObserverTest.java | 94 ++++++++++++++++++++++ 7 files changed, 145 insertions(+), 38 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index f3a00e4..d0bc217 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -74,7 +74,8 @@ public class FlinkOperator { reconcilerFactory, observerFactory); - FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller); + FlinkControllerConfig controllerConfig = + new FlinkControllerConfig(controller, operatorConfiguration.getWatchedNamespaces()); controller.setControllerConfig(controllerConfig); controllerConfig.setConfigurationService(configurationService); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index ea90ec5..6e25075 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -20,9 +20,12 @@ package org.apache.flink.kubernetes.operator.config; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.utils.EnvUtils; +import org.apache.flink.kubernetes.operator.utils.OperatorUtils; import lombok.Value; +import java.util.Set; + /** Configuration class for operator. */ @Value public class FlinkOperatorConfiguration { @@ -32,6 +35,7 @@ public class FlinkOperatorConfiguration { int restApiReadyDelaySeconds; int savepointTriggerGracePeriodSeconds; String flinkServiceHostOverride; + Set<String> watchedNamespaces; public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { int reconcileIntervalSeconds = @@ -57,11 +61,14 @@ public class FlinkOperatorConfiguration { flinkServiceHostOverride = "localhost"; } + Set<String> watchedNamespaces = OperatorUtils.getWatchedNamespaces(); + return new FlinkOperatorConfiguration( reconcileIntervalSeconds, progressCheckIntervalSeconds, restApiReadyDelaySeconds, savepointTriggerGracePeriodSeconds, - flinkServiceHostOverride); + flinkServiceHostOverride, + watchedNamespaces); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java index ba8176a..49b2b27 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java @@ -18,33 +18,24 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.utils.EnvUtils; import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration; -import org.apache.commons.lang3.StringUtils; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; /** Custom config for {@link FlinkDeploymentController}. */ public class FlinkControllerConfig extends AnnotationConfiguration<FlinkDeployment> { - private static final String NAMESPACES_SPLITTER_KEY = ","; + private final Set<String> watchedNamespaces; - public FlinkControllerConfig(FlinkDeploymentController reconciler) { + public FlinkControllerConfig( + FlinkDeploymentController reconciler, Set<String> watchedNamespaces) { super(reconciler); + this.watchedNamespaces = watchedNamespaces; } @Override public Set<String> getNamespaces() { - String watchedNamespaces = EnvUtils.get(EnvUtils.ENV_WATCHED_NAMESPACES); - - if (StringUtils.isEmpty(watchedNamespaces)) { - return Collections.emptySet(); - } else { - return new HashSet<>(Arrays.asList(watchedNamespaces.split(NAMESPACES_SPLITTER_KEY))); - } + return watchedNamespaces; } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java index 2dee79e..180cb6f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java @@ -66,7 +66,7 @@ public abstract class BaseObserver implements Observer { return; } - Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class); + Optional<Deployment> deployment = getSecondaryResource(flinkApp, context); if (deployment.isPresent()) { DeploymentStatus status = deployment.get().getStatus(); DeploymentSpec spec = deployment.get().getSpec(); @@ -111,6 +111,14 @@ public abstract class BaseObserver implements Observer { deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); } + private Optional<Deployment> getSecondaryResource(FlinkDeployment flinkApp, Context context) { + return context.getSecondaryResource( + Deployment.class, + operatorConfiguration.getWatchedNamespaces().size() > 1 + ? flinkApp.getMetadata().getNamespace() + : null); + } + protected boolean isClusterReady(FlinkDeployment dep) { return dep.getStatus().getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java index 48e55ee..28dc8fc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java @@ -28,10 +28,18 @@ import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; /** Operator SDK related utility functions. */ public class OperatorUtils { + private static final String NAMESPACES_SPLITTER_KEY = ","; + public static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource( KubernetesClient kubernetesClient, String namespace) { return createJmDepInformerEventSource( @@ -57,8 +65,18 @@ public class OperatorUtils { return new InformerEventSource<>(informer, Mappers.fromLabel(Constants.LABEL_APP_KEY)) { @Override public String name() { - return super.name() + "-" + name; + return name; } }; } + + public static Set<String> getWatchedNamespaces() { + String watchedNamespaces = EnvUtils.get(EnvUtils.ENV_WATCHED_NAMESPACES); + + if (StringUtils.isEmpty(watchedNamespaces)) { + return Collections.emptySet(); + } else { + return new HashSet<>(Arrays.asList(watchedNamespaces.split(NAMESPACES_SPLITTER_KEY))); + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index d9ff90b..9f011de 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test; import java.net.HttpURLConnection; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -66,7 +67,7 @@ public class FlinkDeploymentControllerTest { private final Context context = TestUtils.createContextWithReadyJobManagerDeployment(); private final FlinkOperatorConfiguration operatorConfiguration = - new FlinkOperatorConfiguration(1, 2, 3, 4, null); + new FlinkOperatorConfiguration(1, 2, 3, 4, null, Collections.emptySet()); private TestingFlinkService flinkService; private FlinkDeploymentController testController; @@ -387,33 +388,19 @@ public class FlinkDeploymentControllerTest { public void testPrepareEventSource() { // Test watch all testController.setControllerConfig( - new FlinkControllerConfig(testController) { - @Override - public Set<String> getEffectiveNamespaces() { - return Set.of(); - } - }); + new FlinkControllerConfig(testController, Collections.emptySet())); List<EventSource> eventSources = testController.prepareEventSources(null); assertEquals(1, eventSources.size()); - assertTrue(eventSources.get(0).name().endsWith("-all")); + assertEquals("all", eventSources.get(0).name()); // Test watch namespaces Set<String> namespaces = Set.of("ns1", "ns2", "ns3"); - testController.setControllerConfig( - new FlinkControllerConfig(testController) { - @Override - public Set<String> getEffectiveNamespaces() { - return namespaces; - } - }); + testController.setControllerConfig(new FlinkControllerConfig(testController, namespaces)); eventSources = testController.prepareEventSources(null); assertEquals(3, eventSources.size()); assertEquals( namespaces, - eventSources.stream() - .map(EventSource::name) - .map(s -> s.substring(s.length() - 3)) - .collect(Collectors.toSet())); + eventSources.stream().map(EventSource::name).collect(Collectors.toSet())); } private FlinkDeploymentController createTestController( @@ -429,7 +416,8 @@ public class FlinkDeploymentControllerTest { new ReconcilerFactory( kubernetesClient, flinkService, operatorConfiguration), new ObserverFactory(flinkService, operatorConfiguration)); - controller.setControllerConfig(new FlinkControllerConfig(controller)); + controller.setControllerConfig( + new FlinkControllerConfig(controller, Collections.emptySet())); return controller; } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java index 6e7cb48..1721671 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java @@ -26,10 +26,20 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; +import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; /** {@link SessionObserver} unit tests. */ public class SessionObserverTest { @@ -66,4 +76,88 @@ public class SessionObserverTest { JobManagerDeploymentStatus.READY, deployment.getStatus().getJobManagerDeploymentStatus()); } + + @Test + public void testWatchMultipleNamespaces() { + FlinkService flinkService = new TestingFlinkService(); + FlinkDeployment deployment = TestUtils.buildSessionCluster(); + deployment + .getStatus() + .getReconciliationStatus() + .setLastReconciledSpec(deployment.getSpec()); + + FlinkOperatorConfiguration allNsConfig = + new FlinkOperatorConfiguration(1, 2, 3, 4, null, Collections.emptySet()); + FlinkOperatorConfiguration specificNsConfig = + new FlinkOperatorConfiguration( + 1, 2, 3, 4, null, Set.of(deployment.getMetadata().getNamespace())); + FlinkOperatorConfiguration multipleNsConfig = + new FlinkOperatorConfiguration( + 1, 2, 3, 4, null, Set.of(deployment.getMetadata().getNamespace(), "ns")); + + Deployment k8sDeployment = new Deployment(); + k8sDeployment.setSpec(new DeploymentSpec()); + k8sDeployment.setStatus(new DeploymentStatus()); + + AtomicInteger secondaryResourceAccessed = new AtomicInteger(0); + Observer allNsObserver = new SessionObserver(flinkService, allNsConfig); + allNsObserver.observe( + deployment, + new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource(Class<T> aClass, String s) { + assertNull(s); + secondaryResourceAccessed.addAndGet(1); + return Optional.of((T) k8sDeployment); + } + }, + FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + + assertEquals(1, secondaryResourceAccessed.get()); + + Observer specificNsObserver = new SessionObserver(flinkService, specificNsConfig); + specificNsObserver.observe( + deployment, + new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource(Class<T> aClass, String s) { + assertNull(s); + secondaryResourceAccessed.addAndGet(1); + return Optional.of((T) k8sDeployment); + } + }, + FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + + assertEquals(2, secondaryResourceAccessed.get()); + + Observer multipleNsObserver = new SessionObserver(flinkService, multipleNsConfig); + multipleNsObserver.observe( + deployment, + new Context() { + @Override + public Optional<RetryInfo> getRetryInfo() { + return Optional.empty(); + } + + @Override + public <T> Optional<T> getSecondaryResource(Class<T> aClass, String s) { + assertEquals(deployment.getMetadata().getNamespace(), s); + secondaryResourceAccessed.addAndGet(1); + return Optional.of((T) k8sDeployment); + } + }, + FlinkUtils.getEffectiveConfig(deployment, new Configuration())); + + assertEquals(3, secondaryResourceAccessed.get()); + } }