This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new aa4fa0d [FLINK-26337] Avoid to load flink conf at each reconcile loop aa4fa0d is described below commit aa4fa0d9263a74318201076b165b352080f5df12 Author: 愚鲤 <yuli....@alibaba-inc.com> AuthorDate: Wed Feb 23 22:57:22 2022 +0800 [FLINK-26337] Avoid to load flink conf at each reconcile loop Closes #19 --- .../flink/kubernetes/operator/FlinkOperator.java | 12 +++++-- .../kubernetes/operator/config/DefaultConfig.java | 41 ++++++++++++++++++++++ .../controller/FlinkDeploymentController.java | 9 +++-- .../operator/metrics/OperatorMetricUtils.java | 8 ++--- .../operator/utils/FlinkConfigBuilder.java | 11 +++--- .../kubernetes/operator/utils/FlinkUtils.java | 19 ++++++++-- .../controller/FlinkDeploymentControllerTest.java | 8 ++++- .../operator/observer/JobStatusObserverTest.java | 5 +-- .../operator/reconciler/JobReconcilerTest.java | 2 +- .../operator/utils/FlinkConfigBuilderTest.java | 37 +++++++++++++------ 10 files changed, 120 insertions(+), 32 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 9484bea..9e093a1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator; +import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; @@ -41,8 +42,8 @@ public class FlinkOperator { public static void main(String... args) { LOG.info("Starting Flink Kubernetes Operator"); - OperatorMetricUtils.initOperatorMetrics( - FlinkUtils.loadConfiguration(System.getenv().get(ENV_FLINK_OPERATOR_CONF_DIR))); + DefaultConfig defaultConfig = FlinkUtils.loadDefaultConfig(); + OperatorMetricUtils.initOperatorMetrics(defaultConfig.getOperatorConfig()); DefaultKubernetesClient client = new DefaultKubernetesClient(); String namespace = client.getNamespace(); @@ -61,7 +62,12 @@ public class FlinkOperator { FlinkDeploymentController controller = new FlinkDeploymentController( - client, namespace, observer, jobReconciler, sessionReconciler); + defaultConfig, + client, + namespace, + observer, + jobReconciler, + sessionReconciler); FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller); controllerConfig.setConfigurationService(configurationService); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/DefaultConfig.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/DefaultConfig.java new file mode 100644 index 0000000..2dfe59d --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/DefaultConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.config; + +import org.apache.flink.configuration.Configuration; + +/** The container for the operator component default config. */ +public class DefaultConfig { + + private final Configuration operatorConfig; + private final Configuration defaultFlinkConfig; + + public DefaultConfig(Configuration operatorConfig, Configuration defaultFlinkConfig) { + this.operatorConfig = operatorConfig; + this.defaultFlinkConfig = defaultFlinkConfig; + } + + public Configuration getOperatorConfig() { + return operatorConfig; + } + + public Configuration getDefaultFlinkConfig() { + return defaultFlinkConfig; + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 3ab2638..7b277ae 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException; @@ -64,13 +65,16 @@ public class FlinkDeploymentController private final JobStatusObserver observer; private final JobReconciler jobReconciler; private final SessionReconciler sessionReconciler; + private final DefaultConfig defaultConfig; public FlinkDeploymentController( + DefaultConfig defaultConfig, KubernetesClient kubernetesClient, String operatorNamespace, JobStatusObserver observer, JobReconciler jobReconciler, SessionReconciler sessionReconciler) { + this.defaultConfig = defaultConfig; this.kubernetesClient = kubernetesClient; this.operatorNamespace = operatorNamespace; this.observer = observer; @@ -84,7 +88,7 @@ public class FlinkDeploymentController FlinkUtils.deleteCluster(flinkApp, kubernetesClient); IngressUtils.updateIngressRules( flinkApp, - FlinkUtils.getEffectiveConfig(flinkApp), + FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getDefaultFlinkConfig()), operatorNamespace, kubernetesClient, true); @@ -95,7 +99,8 @@ public class FlinkDeploymentController public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) { LOG.info("Reconciling {}", flinkApp.getMetadata().getName()); - Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp); + Configuration effectiveConfig = + FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getDefaultFlinkConfig()); try { boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig); if (successfulObserve) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java index 5dfe6ba..7011446 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java @@ -34,13 +34,13 @@ public class OperatorMetricUtils { private static final String ENV_OPERATOR_NAME = "OPERATOR_NAME"; private static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE"; - public static void initOperatorMetrics(Configuration configuration) { - PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); - MetricRegistry metricRegistry = createMetricRegistry(configuration, pluginManager); + public static void initOperatorMetrics(Configuration operatorConfig) { + PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(operatorConfig); + MetricRegistry metricRegistry = createMetricRegistry(operatorConfig, pluginManager); KubernetesOperatorMetricGroup operatorMetricGroup = KubernetesOperatorMetricGroup.create( metricRegistry, - configuration, + operatorConfig, System.getenv().getOrDefault(ENV_OPERATOR_NAMESPACE, "default"), System.getenv().getOrDefault(ENV_OPERATOR_NAME, "flink-operator"), System.getenv().getOrDefault(ENV_HOSTNAME, "localhost")); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java index f55a1f1..2b01b21 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java @@ -17,7 +17,6 @@ package org.apache.flink.kubernetes.operator.utils; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -51,12 +50,10 @@ public class FlinkConfigBuilder { private final FlinkDeploymentSpec spec; private final Configuration effectiveConfig; - public FlinkConfigBuilder(FlinkDeployment deploy) { + public FlinkConfigBuilder(FlinkDeployment deploy, Configuration defaultFlinkConfig) { this.deploy = deploy; this.spec = this.deploy.getSpec(); - this.effectiveConfig = - FlinkUtils.loadConfiguration( - System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR)); + this.effectiveConfig = defaultFlinkConfig; } public FlinkConfigBuilder applyImage() { @@ -167,9 +164,9 @@ public class FlinkConfigBuilder { return effectiveConfig; } - public static Configuration buildFrom(FlinkDeployment dep) + public static Configuration buildFrom(FlinkDeployment dep, Configuration defaultFlinkConf) throws IOException, URISyntaxException { - return new FlinkConfigBuilder(dep) + return new FlinkConfigBuilder(dep, defaultFlinkConf) .applyFlinkConfiguration() .applyImage() .applyImagePullPolicy() diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 5c5e71b..b9eec3b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -17,8 +17,11 @@ package org.apache.flink.kubernetes.operator.utils; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.operator.FlinkOperator; +import org.apache.flink.kubernetes.operator.config.DefaultConfig; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import com.fasterxml.jackson.databind.JsonNode; @@ -38,9 +41,21 @@ public class FlinkUtils { private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class); private static final ObjectMapper MAPPER = new ObjectMapper(); - public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) { + public static DefaultConfig loadDefaultConfig() { + // TODO refactor after FLINK-26332 + Configuration operatorConfig = + FlinkUtils.loadConfiguration( + System.getenv().get(FlinkOperator.ENV_FLINK_OPERATOR_CONF_DIR)); + Configuration flinkDefaultConfig = + FlinkUtils.loadConfiguration(System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR)); + return new DefaultConfig(operatorConfig, flinkDefaultConfig); + } + + public static Configuration getEffectiveConfig( + FlinkDeployment flinkApp, Configuration defaultFlinkConfig) { try { - final Configuration effectiveConfig = FlinkConfigBuilder.buildFrom(flinkApp); + final Configuration effectiveConfig = + FlinkConfigBuilder.buildFrom(flinkApp, defaultFlinkConfig); LOG.debug("Effective config: {}", effectiveConfig); return effectiveConfig; } catch (Exception e) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 730bc80..25b264b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; @@ -100,6 +101,11 @@ public class FlinkDeploymentControllerTest { SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService); return new FlinkDeploymentController( - null, "test", observer, jobReconciler, sessionReconciler); + FlinkUtils.loadDefaultConfig(), + null, + "test", + observer, + jobReconciler, + sessionReconciler); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index 07a28f6..398b759 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java @@ -46,7 +46,8 @@ public class JobStatusObserverTest { .setLastReconciledSpec(deployment.getSpec()); assertTrue( observer.observeFlinkJobStatus( - deployment, FlinkUtils.getEffectiveConfig(deployment))); + deployment, + FlinkUtils.getEffectiveConfig(deployment, new Configuration()))); } @Test @@ -54,7 +55,7 @@ public class JobStatusObserverTest { TestingFlinkService flinkService = new TestingFlinkService(); JobStatusObserver observer = new JobStatusObserver(flinkService); FlinkDeployment deployment = TestUtils.buildApplicationCluster(); - Configuration conf = FlinkUtils.getEffectiveConfig(deployment); + Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); assertTrue(observer.observeFlinkJobStatus(deployment, conf)); deployment.setStatus(new FlinkDeploymentStatus()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java index 5d211a4..ab4a804 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java @@ -43,7 +43,7 @@ public class JobReconcilerTest { JobReconciler reconciler = new JobReconciler(null, flinkService); FlinkDeployment deployment = TestUtils.buildApplicationCluster(); - Configuration config = FlinkUtils.getEffectiveConfig(deployment); + Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration()); reconciler.reconcile("test", deployment, config); List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java index 86c06d7..ad9e798 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java @@ -75,14 +75,16 @@ public class FlinkConfigBuilderTest { @Test public void testApplyImage() { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyImage().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyImage().build(); Assert.assertEquals(IMAGE, configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE)); } @Test public void testApplyImagePolicy() { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyImagePullPolicy().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyImagePullPolicy() + .build(); Assert.assertEquals( IMAGE_POLICY, configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString()); @@ -91,14 +93,18 @@ public class FlinkConfigBuilderTest { @Test public void testApplyFlinkConfiguration() { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyFlinkConfiguration() + .build(); Assert.assertEquals(2, (int) configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)); } @Test public void testApplyCommonPodTemplate() throws Exception { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyCommonPodTemplate().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyCommonPodTemplate() + .build(); final Pod jmPod = OBJECT_MAPPER.readValue( new File( @@ -118,7 +124,9 @@ public class FlinkConfigBuilderTest { @Test public void testApplyIngressDomain() { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyIngressDomain().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyIngressDomain() + .build(); Assert.assertEquals( KubernetesConfigOptions.ServiceExposedType.ClusterIP, configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)); @@ -127,7 +135,9 @@ public class FlinkConfigBuilderTest { @Test public void testApplyServiceAccount() { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyServiceAccount().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyServiceAccount() + .build(); Assert.assertEquals( SERVICE_ACCOUNT, configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)); @@ -136,7 +146,9 @@ public class FlinkConfigBuilderTest { @Test public void testApplyJobManagerSpec() throws Exception { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyJobManagerSpec().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyJobManagerSpec() + .build(); final Pod jmPod = OBJECT_MAPPER.readValue( new File( @@ -160,7 +172,9 @@ public class FlinkConfigBuilderTest { deploymentClone.getSpec().setPodTemplate(null); final Configuration configuration = - new FlinkConfigBuilder(deploymentClone).applyTaskManagerSpec().build(); + new FlinkConfigBuilder(deploymentClone, new Configuration()) + .applyTaskManagerSpec() + .build(); final Pod tmPod = OBJECT_MAPPER.readValue( new File( @@ -178,7 +192,9 @@ public class FlinkConfigBuilderTest { @Test public void testApplyJobOrSessionSpec() throws Exception { final Configuration configuration = - new FlinkConfigBuilder(flinkDeployment).applyJobOrSessionSpec().build(); + new FlinkConfigBuilder(flinkDeployment, new Configuration()) + .applyJobOrSessionSpec() + .build(); Assert.assertEquals( KubernetesDeploymentTarget.APPLICATION.getName(), configuration.get(DeploymentOptions.TARGET)); @@ -188,7 +204,8 @@ public class FlinkConfigBuilderTest { @Test public void testBuildFrom() throws Exception { - final Configuration configuration = FlinkConfigBuilder.buildFrom(flinkDeployment); + final Configuration configuration = + FlinkConfigBuilder.buildFrom(flinkDeployment, new Configuration()); final String namespace = flinkDeployment.getMetadata().getNamespace(); final String clusterId = flinkDeployment.getMetadata().getName(); // Most configs have been tested by previous unit tests, thus we only verify the namespace