This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bee794  Use localhost address when running outside k8s
3bee794 is described below

commit 3bee794e3283091bb73a7aad873167c473a8d97c
Author: Thomas Weise <t...@apache.org>
AuthorDate: Fri Mar 11 11:00:23 2022 -0800

    Use localhost address when running outside k8s
---
 .../org/apache/flink/kubernetes/operator/FlinkOperator.java |  4 +++-
 .../operator/config/FlinkOperatorConfiguration.java         | 11 ++++++++++-
 .../flink/kubernetes/operator/service/FlinkService.java     | 13 +++++++++++--
 .../flink/kubernetes/operator/TestingFlinkService.java      |  2 +-
 .../operator/controller/FlinkDeploymentControllerTest.java  |  2 +-
 .../flink/kubernetes/operator/service/FlinkServiceTest.java |  2 +-
 6 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 1125b65..f3a00e4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -54,9 +54,11 @@ public class FlinkOperator {
         DefaultConfigurationService configurationService = 
DefaultConfigurationService.instance();
         Operator operator = new Operator(client, configurationService);
 
-        FlinkService flinkService = new FlinkService(client);
         FlinkOperatorConfiguration operatorConfiguration =
                 
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
+
+        FlinkService flinkService = new FlinkService(client, 
operatorConfiguration);
+
         FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
         ReconcilerFactory reconcilerFactory =
                 new ReconcilerFactory(client, flinkService, 
operatorConfiguration);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 1ffff66..ea90ec5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.operator.config;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 
 import lombok.Value;
 
@@ -30,6 +31,7 @@ public class FlinkOperatorConfiguration {
     int progressCheckIntervalSeconds;
     int restApiReadyDelaySeconds;
     int savepointTriggerGracePeriodSeconds;
+    String flinkServiceHostOverride;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         int reconcileIntervalSeconds =
@@ -49,10 +51,17 @@ public class FlinkOperatorConfiguration {
                         OperatorConfigOptions
                                 
.OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD_IN_SEC);
 
+        String flinkServiceHostOverride = null;
+        if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
+            // not running in k8s, simplify local development
+            flinkServiceHostOverride = "localhost";
+        }
+
         return new FlinkOperatorConfiguration(
                 reconcileIntervalSeconds,
                 progressCheckIntervalSeconds,
                 restApiReadyDelaySeconds,
-                savepointTriggerGracePeriodSeconds);
+                savepointTriggerGracePeriodSeconds,
+                flinkServiceHostOverride);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index f3e8655..4dbd1e6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -52,6 +53,7 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMes
 import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,9 +74,13 @@ public class FlinkService {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkService.class);
 
     private final NamespacedKubernetesClient kubernetesClient;
+    private final FlinkOperatorConfiguration operatorConfiguration;
 
-    public FlinkService(NamespacedKubernetesClient kubernetesClient) {
+    public FlinkService(
+            NamespacedKubernetesClient kubernetesClient,
+            FlinkOperatorConfiguration operatorConfiguration) {
         this.kubernetesClient = kubernetesClient;
+        this.operatorConfiguration = operatorConfiguration;
     }
 
     public void submitApplicationCluster(FlinkDeployment deployment, 
Configuration conf)
@@ -141,7 +147,10 @@ public class FlinkService {
         final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
         final int port = config.getInteger(RestOptions.PORT);
         final String host =
-                
ExternalServiceDecorator.getNamespacedExternalServiceName(clusterId, namespace);
+                ObjectUtils.firstNonNull(
+                        operatorConfiguration.getFlinkServiceHostOverride(),
+                        
ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
         final String restServerAddress = String.format("http://%s:%s";, host, 
port);
         LOG.debug("Creating RestClusterClient({})", restServerAddress);
         return new RestClusterClient<>(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index cbb7fb9..38c3374 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -48,7 +48,7 @@ public class TestingFlinkService extends FlinkService {
     private boolean isPortReady = true;
 
     public TestingFlinkService() {
-        super(null);
+        super(null, null);
     }
 
     public void clear() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 226186a..e55bfc4 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -66,7 +66,7 @@ public class FlinkDeploymentControllerTest {
 
     private final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
     private final FlinkOperatorConfiguration operatorConfiguration =
-            new FlinkOperatorConfiguration(1, 2, 3, 4);
+            new FlinkOperatorConfiguration(1, 2, 3, 4, null);
 
     private TestingFlinkService flinkService;
     private FlinkDeploymentController testController;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
index c760629..7fc837f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
@@ -139,7 +139,7 @@ public class FlinkServiceTest {
     }
 
     private FlinkService createFlinkService(ClusterClient<String> 
clusterClient) {
-        return new FlinkService((NamespacedKubernetesClient) client) {
+        return new FlinkService((NamespacedKubernetesClient) client, null) {
             @Override
             protected ClusterClient<String> getClusterClient(Configuration 
config) {
                 return clusterClient;

Reply via email to