This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 26ca85954dec35094331fea67eb79b6380154709 Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Wed Feb 2 15:39:58 2022 +0100 Adding Ingress support --- examples/basic-ingress.yaml | 28 +++++++ .../controller/reconciler/JobReconciler.java | 2 + .../controller/reconciler/SessionReconciler.java | 2 + .../operator/crd/spec/FlinkDeploymentSpec.java | 1 + .../kubernetes/operator/utils/FlinkUtils.java | 6 ++ .../kubernetes/operator/utils/KubernetesUtils.java | 97 ++++++++++++++++++++++ 6 files changed, 136 insertions(+) diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml new file mode 100644 index 0000000..1a19c15 --- /dev/null +++ b/examples/basic-ingress.yaml @@ -0,0 +1,28 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: basic-example +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 + ingressDomain: flink.k8s.io + flinkConfiguration: +# rest.address: basic-example.flink.k8s.io +# rest.port: "80" + taskmanager.numberOfTaskSlots: "2" + kubernetes.jobmanager.service-account: flink-operator + jobManager: + replicas: 1 + resource: + memory: "2048m" + cpu: 1 + taskManager: + taskSlots: 2 + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 2 + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java index 6c9ea2b..baa97f9 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java @@ -14,6 +14,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.KubernetesUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import io.fabric8.kubernetes.client.KubernetesClient; @@ -47,6 +48,7 @@ public class JobReconciler { } try { deployFlinkJob(flinkApp, effectiveConfig, Optional.empty()); + KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); return true; } catch (Exception e) { LOG.error("Error while deploying " + flinkApp.getMetadata().getName()); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java index 4984f08..a17a28e 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java @@ -8,6 +8,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.KubernetesUtils; import io.fabric8.kubernetes.client.KubernetesClient; import org.slf4j.Logger; @@ -33,6 +34,7 @@ public class SessionReconciler { flinkApp.setStatus(new FlinkDeploymentStatus()); try { deployFlinkSession(flinkApp, effectiveConfig); + KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); return true; } catch (Exception e) { LOG.error("Error while deploying " + flinkApp.getMetadata().getName()); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java index 5a562a3..5851f1c 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java @@ -13,6 +13,7 @@ public class FlinkDeploymentSpec { private String image; private String imagePullPolicy; private String flinkVersion; + private String ingressDomain; private Map<String, String> flinkConfiguration; private Pod podTemplate; private JobManagerSpec jobManager; diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 007b5af..4451814 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -55,6 +55,12 @@ public class FlinkUtils { effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace); effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); + if (spec.getIngressDomain() != null) { + effectiveConfig.set( + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, + KubernetesConfigOptions.ServiceExposedType.ClusterIP); + } + if (spec.getJob() != null) { effectiveConfig.set( DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java new file mode 100644 index 0000000..474eb69 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java @@ -0,0 +1,97 @@ +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.OwnerReference; +import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; +import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Kubernetes related utilities. */ +public class KubernetesUtils { + + public static final String REST_SVC_NAME_SUFFIX = "-rest"; + public static final String INGRESS_API_VERSION = "networking.k8s.io/v1"; + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesUtils.class); + + public static void deployIngress( + FlinkDeployment flinkApp, + Configuration effectiveConfig, + KubernetesClient kubernetesClient) { + if (flinkApp.getSpec().getIngressDomain() != null) { + final List<IngressRule> ingressRules = new ArrayList<>(); + final String clusterId = flinkApp.getMetadata().getName(); + final String namespace = flinkApp.getMetadata().getNamespace(); + final int restPort = effectiveConfig.getInteger(RestOptions.PORT); + final String ingressHost = + String.format("%s.%s", clusterId, flinkApp.getSpec().getIngressDomain()); + ingressRules.add( + new IngressRule( + ingressHost, + new HTTPIngressRuleValueBuilder() + .addNewPath() + .withPathType("ImplementationSpecific") + .withNewBackend() + .withNewService() + .withName(clusterId + REST_SVC_NAME_SUFFIX) + .withNewPort() + .withNumber(restPort) + .endPort() + .endService() + .endBackend() + .endPath() + .build())); + final Ingress ingress = + new IngressBuilder() + .withApiVersion(INGRESS_API_VERSION) + .withNewMetadata() + .withName(clusterId) + .endMetadata() + .withNewSpec() + .withRules(ingressRules) + .endSpec() + .build(); + + Deployment deployment = + kubernetesClient + .apps() + .deployments() + .inNamespace(flinkApp.getMetadata().getNamespace()) + .withName(flinkApp.getMetadata().getName()) + .get(); + KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress)); + LOG.info(ingress.toString()); + kubernetesClient.resourceList(ingress).inNamespace(namespace).createOrReplace(); + } + } + + private static void setOwnerReference(HasMetadata owner, List<HasMetadata> resources) { + final OwnerReference ownerReference = + new OwnerReferenceBuilder() + .withName(owner.getMetadata().getName()) + .withApiVersion(owner.getApiVersion()) + .withUid(owner.getMetadata().getUid()) + .withKind(owner.getKind()) + .withController(true) + .withBlockOwnerDeletion(true) + .build(); + resources.forEach( + resource -> + resource.getMetadata() + .setOwnerReferences(Collections.singletonList(ownerReference))); + } +}