This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 26ca85954dec35094331fea67eb79b6380154709
Author: Matyas Orhidi <matyas_orh...@apple.com>
AuthorDate: Wed Feb 2 15:39:58 2022 +0100

    Adding Ingress support
---
 examples/basic-ingress.yaml                        | 28 +++++++
 .../controller/reconciler/JobReconciler.java       |  2 +
 .../controller/reconciler/SessionReconciler.java   |  2 +
 .../operator/crd/spec/FlinkDeploymentSpec.java     |  1 +
 .../kubernetes/operator/utils/FlinkUtils.java      |  6 ++
 .../kubernetes/operator/utils/KubernetesUtils.java | 97 ++++++++++++++++++++++
 6 files changed, 136 insertions(+)

diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
new file mode 100644
index 0000000..1a19c15
--- /dev/null
+++ b/examples/basic-ingress.yaml
@@ -0,0 +1,28 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: basic-example
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  ingressDomain: flink.k8s.io
+  flinkConfiguration:
+#    rest.address: basic-example.flink.k8s.io
+#    rest.port: "80"
+    taskmanager.numberOfTaskSlots: "2"
+    kubernetes.jobmanager.service-account: flink-operator
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    taskSlots: 2
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+
diff --git 
a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
 
b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
index 6c9ea2b..baa97f9 100644
--- 
a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
+++ 
b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
@@ -14,6 +14,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -47,6 +48,7 @@ public class JobReconciler {
             }
             try {
                 deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                KubernetesUtils.deployIngress(flinkApp, effectiveConfig, 
kubernetesClient);
                 return true;
             } catch (Exception e) {
                 LOG.error("Error while deploying " + 
flinkApp.getMetadata().getName());
diff --git 
a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
 
b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
index 4984f08..a17a28e 100644
--- 
a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
+++ 
b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
@@ -8,6 +8,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.slf4j.Logger;
@@ -33,6 +34,7 @@ public class SessionReconciler {
             flinkApp.setStatus(new FlinkDeploymentStatus());
             try {
                 deployFlinkSession(flinkApp, effectiveConfig);
+                KubernetesUtils.deployIngress(flinkApp, effectiveConfig, 
kubernetesClient);
                 return true;
             } catch (Exception e) {
                 LOG.error("Error while deploying " + 
flinkApp.getMetadata().getName());
diff --git 
a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
 
b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 5a562a3..5851f1c 100644
--- 
a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ 
b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -13,6 +13,7 @@ public class FlinkDeploymentSpec {
     private String image;
     private String imagePullPolicy;
     private String flinkVersion;
+    private String ingressDomain;
     private Map<String, String> flinkConfiguration;
     private Pod podTemplate;
     private JobManagerSpec jobManager;
diff --git 
a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java 
b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 007b5af..4451814 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -55,6 +55,12 @@ public class FlinkUtils {
             effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, 
namespace);
             effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, 
clusterId);
 
+            if (spec.getIngressDomain() != null) {
+                effectiveConfig.set(
+                        KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+                        KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+            }
+
             if (spec.getJob() != null) {
                 effectiveConfig.set(
                         DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
diff --git 
a/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java 
b/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java
new file mode 100644
index 0000000..474eb69
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java
@@ -0,0 +1,97 @@
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.OwnerReference;
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import 
io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Kubernetes related utilities. */
+public class KubernetesUtils {
+
+    public static final String REST_SVC_NAME_SUFFIX = "-rest";
+    public static final String INGRESS_API_VERSION = "networking.k8s.io/v1";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesUtils.class);
+
+    public static void deployIngress(
+            FlinkDeployment flinkApp,
+            Configuration effectiveConfig,
+            KubernetesClient kubernetesClient) {
+        if (flinkApp.getSpec().getIngressDomain() != null) {
+            final List<IngressRule> ingressRules = new ArrayList<>();
+            final String clusterId = flinkApp.getMetadata().getName();
+            final String namespace = flinkApp.getMetadata().getNamespace();
+            final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
+            final String ingressHost =
+                    String.format("%s.%s", clusterId, 
flinkApp.getSpec().getIngressDomain());
+            ingressRules.add(
+                    new IngressRule(
+                            ingressHost,
+                            new HTTPIngressRuleValueBuilder()
+                                    .addNewPath()
+                                    .withPathType("ImplementationSpecific")
+                                    .withNewBackend()
+                                    .withNewService()
+                                    .withName(clusterId + REST_SVC_NAME_SUFFIX)
+                                    .withNewPort()
+                                    .withNumber(restPort)
+                                    .endPort()
+                                    .endService()
+                                    .endBackend()
+                                    .endPath()
+                                    .build()));
+            final Ingress ingress =
+                    new IngressBuilder()
+                            .withApiVersion(INGRESS_API_VERSION)
+                            .withNewMetadata()
+                            .withName(clusterId)
+                            .endMetadata()
+                            .withNewSpec()
+                            .withRules(ingressRules)
+                            .endSpec()
+                            .build();
+
+            Deployment deployment =
+                    kubernetesClient
+                            .apps()
+                            .deployments()
+                            .inNamespace(flinkApp.getMetadata().getNamespace())
+                            .withName(flinkApp.getMetadata().getName())
+                            .get();
+            KubernetesUtils.setOwnerReference(deployment, 
Collections.singletonList(ingress));
+            LOG.info(ingress.toString());
+            
kubernetesClient.resourceList(ingress).inNamespace(namespace).createOrReplace();
+        }
+    }
+
+    private static void setOwnerReference(HasMetadata owner, List<HasMetadata> 
resources) {
+        final OwnerReference ownerReference =
+                new OwnerReferenceBuilder()
+                        .withName(owner.getMetadata().getName())
+                        .withApiVersion(owner.getApiVersion())
+                        .withUid(owner.getMetadata().getUid())
+                        .withKind(owner.getKind())
+                        .withController(true)
+                        .withBlockOwnerDeletion(true)
+                        .build();
+        resources.forEach(
+                resource ->
+                        resource.getMetadata()
+                                
.setOwnerReferences(Collections.singletonList(ownerReference)));
+    }
+}

Reply via email to