This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 3bee794 Use localhost address when running outside k8s 3bee794 is described below commit 3bee794e3283091bb73a7aad873167c473a8d97c Author: Thomas Weise <t...@apache.org> AuthorDate: Fri Mar 11 11:00:23 2022 -0800 Use localhost address when running outside k8s --- .../org/apache/flink/kubernetes/operator/FlinkOperator.java | 4 +++- .../operator/config/FlinkOperatorConfiguration.java | 11 ++++++++++- .../flink/kubernetes/operator/service/FlinkService.java | 13 +++++++++++-- .../flink/kubernetes/operator/TestingFlinkService.java | 2 +- .../operator/controller/FlinkDeploymentControllerTest.java | 2 +- .../flink/kubernetes/operator/service/FlinkServiceTest.java | 2 +- 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 1125b65..f3a00e4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -54,9 +54,11 @@ public class FlinkOperator { DefaultConfigurationService configurationService = DefaultConfigurationService.instance(); Operator operator = new Operator(client, configurationService); - FlinkService flinkService = new FlinkService(client); FlinkOperatorConfiguration operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig()); + + FlinkService flinkService = new FlinkService(client, operatorConfiguration); + FlinkDeploymentValidator validator = new DefaultDeploymentValidator(); ReconcilerFactory reconcilerFactory = new ReconcilerFactory(client, flinkService, operatorConfiguration); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java index 1ffff66..ea90ec5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.config; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.utils.EnvUtils; import lombok.Value; @@ -30,6 +31,7 @@ public class FlinkOperatorConfiguration { int progressCheckIntervalSeconds; int restApiReadyDelaySeconds; int savepointTriggerGracePeriodSeconds; + String flinkServiceHostOverride; public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) { int reconcileIntervalSeconds = @@ -49,10 +51,17 @@ public class FlinkOperatorConfiguration { OperatorConfigOptions .OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC); + String flinkServiceHostOverride = null; + if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) { + // not running in k8s, simplify local development + flinkServiceHostOverride = "localhost"; + } + return new FlinkOperatorConfiguration( reconcileIntervalSeconds, progressCheckIntervalSeconds, restApiReadyDelaySeconds, - savepointTriggerGracePeriodSeconds); + savepointTriggerGracePeriodSeconds, + flinkServiceHostOverride); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index f3e8655..4dbd1e6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; @@ -52,6 +53,7 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMes import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,9 +74,13 @@ public class FlinkService { private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class); private final NamespacedKubernetesClient kubernetesClient; + private final FlinkOperatorConfiguration operatorConfiguration; - public FlinkService(NamespacedKubernetesClient kubernetesClient) { + public FlinkService( + NamespacedKubernetesClient kubernetesClient, + FlinkOperatorConfiguration operatorConfiguration) { this.kubernetesClient = kubernetesClient; + this.operatorConfiguration = operatorConfiguration; } public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf) @@ -141,7 +147,10 @@ public class FlinkService { final String namespace = config.get(KubernetesConfigOptions.NAMESPACE); final int port = config.getInteger(RestOptions.PORT); final String host = - ExternalServiceDecorator.getNamespacedExternalServiceName(clusterId, namespace); + ObjectUtils.firstNonNull( + operatorConfiguration.getFlinkServiceHostOverride(), + ExternalServiceDecorator.getNamespacedExternalServiceName( + clusterId, namespace)); final String restServerAddress = String.format("http://%s:%s", host, port); LOG.debug("Creating RestClusterClient({})", restServerAddress); return new RestClusterClient<>( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index cbb7fb9..38c3374 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -48,7 +48,7 @@ public class TestingFlinkService extends FlinkService { private boolean isPortReady = true; public TestingFlinkService() { - super(null); + super(null, null); } public void clear() { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 226186a..e55bfc4 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -66,7 +66,7 @@ public class FlinkDeploymentControllerTest { private final Context context = TestUtils.createContextWithReadyJobManagerDeployment(); private final FlinkOperatorConfiguration operatorConfiguration = - new FlinkOperatorConfiguration(1, 2, 3, 4); + new FlinkOperatorConfiguration(1, 2, 3, 4, null); private TestingFlinkService flinkService; private FlinkDeploymentController testController; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java index c760629..7fc837f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java @@ -139,7 +139,7 @@ public class FlinkServiceTest { } private FlinkService createFlinkService(ClusterClient<String> clusterClient) { - return new FlinkService((NamespacedKubernetesClient) client) { + return new FlinkService((NamespacedKubernetesClient) client, null) { @Override protected ClusterClient<String> getClusterClient(Configuration config) { return clusterClient;