This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new aa4fa0d  [FLINK-26337] Avoid to load flink conf at each reconcile loop
aa4fa0d is described below

commit aa4fa0d9263a74318201076b165b352080f5df12
Author: 愚鲤 <yuli....@alibaba-inc.com>
AuthorDate: Wed Feb 23 22:57:22 2022 +0800

    [FLINK-26337] Avoid to load flink conf at each reconcile loop
    
    Closes #19
---
 .../flink/kubernetes/operator/FlinkOperator.java   | 12 +++++--
 .../kubernetes/operator/config/DefaultConfig.java  | 41 ++++++++++++++++++++++
 .../controller/FlinkDeploymentController.java      |  9 +++--
 .../operator/metrics/OperatorMetricUtils.java      |  8 ++---
 .../operator/utils/FlinkConfigBuilder.java         | 11 +++---
 .../kubernetes/operator/utils/FlinkUtils.java      | 19 ++++++++--
 .../controller/FlinkDeploymentControllerTest.java  |  8 ++++-
 .../operator/observer/JobStatusObserverTest.java   |  5 +--
 .../operator/reconciler/JobReconcilerTest.java     |  2 +-
 .../operator/utils/FlinkConfigBuilderTest.java     | 37 +++++++++++++------
 10 files changed, 120 insertions(+), 32 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 9484bea..9e093a1 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
@@ -41,8 +42,8 @@ public class FlinkOperator {
     public static void main(String... args) {
 
         LOG.info("Starting Flink Kubernetes Operator");
-        OperatorMetricUtils.initOperatorMetrics(
-                
FlinkUtils.loadConfiguration(System.getenv().get(ENV_FLINK_OPERATOR_CONF_DIR)));
+        DefaultConfig defaultConfig = FlinkUtils.loadDefaultConfig();
+        
OperatorMetricUtils.initOperatorMetrics(defaultConfig.getOperatorConfig());
 
         DefaultKubernetesClient client = new DefaultKubernetesClient();
         String namespace = client.getNamespace();
@@ -61,7 +62,12 @@ public class FlinkOperator {
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
-                        client, namespace, observer, jobReconciler, 
sessionReconciler);
+                        defaultConfig,
+                        client,
+                        namespace,
+                        observer,
+                        jobReconciler,
+                        sessionReconciler);
 
         FlinkControllerConfig controllerConfig = new 
FlinkControllerConfig(controller);
         controllerConfig.setConfigurationService(configurationService);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/DefaultConfig.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/DefaultConfig.java
new file mode 100644
index 0000000..2dfe59d
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/DefaultConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.Configuration;
+
+/** The container for the operator component default config. */
+public class DefaultConfig {
+
+    private final Configuration operatorConfig;
+    private final Configuration defaultFlinkConfig;
+
+    public DefaultConfig(Configuration operatorConfig, Configuration 
defaultFlinkConfig) {
+        this.operatorConfig = operatorConfig;
+        this.defaultFlinkConfig = defaultFlinkConfig;
+    }
+
+    public Configuration getOperatorConfig() {
+        return operatorConfig;
+    }
+
+    public Configuration getDefaultFlinkConfig() {
+        return defaultFlinkConfig;
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 3ab2638..7b277ae 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
@@ -64,13 +65,16 @@ public class FlinkDeploymentController
     private final JobStatusObserver observer;
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
+    private final DefaultConfig defaultConfig;
 
     public FlinkDeploymentController(
+            DefaultConfig defaultConfig,
             KubernetesClient kubernetesClient,
             String operatorNamespace,
             JobStatusObserver observer,
             JobReconciler jobReconciler,
             SessionReconciler sessionReconciler) {
+        this.defaultConfig = defaultConfig;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
         this.observer = observer;
@@ -84,7 +88,7 @@ public class FlinkDeploymentController
         FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
         IngressUtils.updateIngressRules(
                 flinkApp,
-                FlinkUtils.getEffectiveConfig(flinkApp),
+                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getDefaultFlinkConfig()),
                 operatorNamespace,
                 kubernetesClient,
                 true);
@@ -95,7 +99,8 @@ public class FlinkDeploymentController
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, 
Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
 
-        Configuration effectiveConfig = 
FlinkUtils.getEffectiveConfig(flinkApp);
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getDefaultFlinkConfig());
         try {
             boolean successfulObserve = 
observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
             if (successfulObserve) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 5dfe6ba..7011446 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -34,13 +34,13 @@ public class OperatorMetricUtils {
     private static final String ENV_OPERATOR_NAME = "OPERATOR_NAME";
     private static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE";
 
-    public static void initOperatorMetrics(Configuration configuration) {
-        PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration);
-        MetricRegistry metricRegistry = createMetricRegistry(configuration, 
pluginManager);
+    public static void initOperatorMetrics(Configuration operatorConfig) {
+        PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(operatorConfig);
+        MetricRegistry metricRegistry = createMetricRegistry(operatorConfig, 
pluginManager);
         KubernetesOperatorMetricGroup operatorMetricGroup =
                 KubernetesOperatorMetricGroup.create(
                         metricRegistry,
-                        configuration,
+                        operatorConfig,
                         System.getenv().getOrDefault(ENV_OPERATOR_NAMESPACE, 
"default"),
                         System.getenv().getOrDefault(ENV_OPERATOR_NAME, 
"flink-operator"),
                         System.getenv().getOrDefault(ENV_HOSTNAME, 
"localhost"));
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index f55a1f1..2b01b21 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -51,12 +50,10 @@ public class FlinkConfigBuilder {
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
 
-    public FlinkConfigBuilder(FlinkDeployment deploy) {
+    public FlinkConfigBuilder(FlinkDeployment deploy, Configuration 
defaultFlinkConfig) {
         this.deploy = deploy;
         this.spec = this.deploy.getSpec();
-        this.effectiveConfig =
-                FlinkUtils.loadConfiguration(
-                        
System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR));
+        this.effectiveConfig = defaultFlinkConfig;
     }
 
     public FlinkConfigBuilder applyImage() {
@@ -167,9 +164,9 @@ public class FlinkConfigBuilder {
         return effectiveConfig;
     }
 
-    public static Configuration buildFrom(FlinkDeployment dep)
+    public static Configuration buildFrom(FlinkDeployment dep, Configuration 
defaultFlinkConf)
             throws IOException, URISyntaxException {
-        return new FlinkConfigBuilder(dep)
+        return new FlinkConfigBuilder(dep, defaultFlinkConf)
                 .applyFlinkConfiguration()
                 .applyImage()
                 .applyImagePullPolicy()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 5c5e71b..b9eec3b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.FlinkOperator;
+import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -38,9 +41,21 @@ public class FlinkUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUtils.class);
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
-    public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) {
+    public static DefaultConfig loadDefaultConfig() {
+        // TODO refactor after FLINK-26332
+        Configuration operatorConfig =
+                FlinkUtils.loadConfiguration(
+                        
System.getenv().get(FlinkOperator.ENV_FLINK_OPERATOR_CONF_DIR));
+        Configuration flinkDefaultConfig =
+                
FlinkUtils.loadConfiguration(System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR));
+        return new DefaultConfig(operatorConfig, flinkDefaultConfig);
+    }
+
+    public static Configuration getEffectiveConfig(
+            FlinkDeployment flinkApp, Configuration defaultFlinkConfig) {
         try {
-            final Configuration effectiveConfig = 
FlinkConfigBuilder.buildFrom(flinkApp);
+            final Configuration effectiveConfig =
+                    FlinkConfigBuilder.buildFrom(flinkApp, defaultFlinkConfig);
             LOG.debug("Effective config: {}", effectiveConfig);
             return effectiveConfig;
         } catch (Exception e) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 730bc80..25b264b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
@@ -100,6 +101,11 @@ public class FlinkDeploymentControllerTest {
         SessionReconciler sessionReconciler = new SessionReconciler(null, 
flinkService);
 
         return new FlinkDeploymentController(
-                null, "test", observer, jobReconciler, sessionReconciler);
+                FlinkUtils.loadDefaultConfig(),
+                null,
+                "test",
+                observer,
+                jobReconciler,
+                sessionReconciler);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index 07a28f6..398b759 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -46,7 +46,8 @@ public class JobStatusObserverTest {
                 .setLastReconciledSpec(deployment.getSpec());
         assertTrue(
                 observer.observeFlinkJobStatus(
-                        deployment, 
FlinkUtils.getEffectiveConfig(deployment)));
+                        deployment,
+                        FlinkUtils.getEffectiveConfig(deployment, new 
Configuration())));
     }
 
     @Test
@@ -54,7 +55,7 @@ public class JobStatusObserverTest {
         TestingFlinkService flinkService = new TestingFlinkService();
         JobStatusObserver observer = new JobStatusObserver(flinkService);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        Configuration conf = FlinkUtils.getEffectiveConfig(deployment);
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
         assertTrue(observer.observeFlinkJobStatus(deployment, conf));
         deployment.setStatus(new FlinkDeploymentStatus());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 5d211a4..ab4a804 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -43,7 +43,7 @@ public class JobReconcilerTest {
 
         JobReconciler reconciler = new JobReconciler(null, flinkService);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        Configuration config = FlinkUtils.getEffectiveConfig(deployment);
+        Configuration config = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
         reconciler.reconcile("test", deployment, config);
         List<Tuple2<String, JobStatusMessage>> runningJobs = 
flinkService.listJobs();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 86c06d7..ad9e798 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -75,14 +75,16 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyImage() {
         final Configuration configuration =
-                new FlinkConfigBuilder(flinkDeployment).applyImage().build();
+                new FlinkConfigBuilder(flinkDeployment, new 
Configuration()).applyImage().build();
         Assert.assertEquals(IMAGE, 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
     }
 
     @Test
     public void testApplyImagePolicy() {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyImagePullPolicy().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyImagePullPolicy()
+                        .build();
         Assert.assertEquals(
                 IMAGE_POLICY,
                 
configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString());
@@ -91,14 +93,18 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyFlinkConfiguration() {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyFlinkConfiguration()
+                        .build();
         Assert.assertEquals(2, (int) 
configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
     }
 
     @Test
     public void testApplyCommonPodTemplate() throws Exception {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyCommonPodTemplate().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyCommonPodTemplate()
+                        .build();
         final Pod jmPod =
                 OBJECT_MAPPER.readValue(
                         new File(
@@ -118,7 +124,9 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyIngressDomain() {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyIngressDomain().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyIngressDomain()
+                        .build();
         Assert.assertEquals(
                 KubernetesConfigOptions.ServiceExposedType.ClusterIP,
                 
configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
@@ -127,7 +135,9 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyServiceAccount() {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyServiceAccount().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyServiceAccount()
+                        .build();
         Assert.assertEquals(
                 SERVICE_ACCOUNT,
                 
configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
@@ -136,7 +146,9 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyJobManagerSpec() throws Exception {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyJobManagerSpec().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyJobManagerSpec()
+                        .build();
         final Pod jmPod =
                 OBJECT_MAPPER.readValue(
                         new File(
@@ -160,7 +172,9 @@ public class FlinkConfigBuilderTest {
         deploymentClone.getSpec().setPodTemplate(null);
 
         final Configuration configuration =
-                new 
FlinkConfigBuilder(deploymentClone).applyTaskManagerSpec().build();
+                new FlinkConfigBuilder(deploymentClone, new Configuration())
+                        .applyTaskManagerSpec()
+                        .build();
         final Pod tmPod =
                 OBJECT_MAPPER.readValue(
                         new File(
@@ -178,7 +192,9 @@ public class FlinkConfigBuilderTest {
     @Test
     public void testApplyJobOrSessionSpec() throws Exception {
         final Configuration configuration =
-                new 
FlinkConfigBuilder(flinkDeployment).applyJobOrSessionSpec().build();
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyJobOrSessionSpec()
+                        .build();
         Assert.assertEquals(
                 KubernetesDeploymentTarget.APPLICATION.getName(),
                 configuration.get(DeploymentOptions.TARGET));
@@ -188,7 +204,8 @@ public class FlinkConfigBuilderTest {
 
     @Test
     public void testBuildFrom() throws Exception {
-        final Configuration configuration = 
FlinkConfigBuilder.buildFrom(flinkDeployment);
+        final Configuration configuration =
+                FlinkConfigBuilder.buildFrom(flinkDeployment, new 
Configuration());
         final String namespace = flinkDeployment.getMetadata().getNamespace();
         final String clusterId = flinkDeployment.getMetadata().getName();
         // Most configs have been tested by previous unit tests, thus we only 
verify the namespace

Reply via email to