This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit b84af2592d7f9c61ccee33747d8550f1f118f493 Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Thu Feb 10 15:35:47 2022 +0100 moving to single ingress --- examples/basic-ingress.yaml | 2 +- .../controller/FlinkDeploymentController.java | 16 ++- .../operator/reconciler/JobReconciler.java | 8 +- .../operator/reconciler/SessionReconciler.java | 8 +- .../kubernetes/operator/utils/IngressUtils.java | 112 ++++++++++++++++++++ .../kubernetes/operator/utils/KubernetesUtils.java | 114 --------------------- .../{values.yaml => templates/ingress.yaml} | 44 +++----- helm/flink-operator/templates/rbac.yaml | 1 + helm/flink-operator/values.yaml | 3 + 9 files changed, 154 insertions(+), 154 deletions(-) diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml index c0ad3a2..d846766 100644 --- a/examples/basic-ingress.yaml +++ b/examples/basic-ingress.yaml @@ -20,7 +20,7 @@ apiVersion: flink.apache.org/v1alpha1 kind: FlinkDeployment metadata: namespace: default - name: basic-example + name: basic-ingress spec: image: flink:1.14.3 flinkVersion: 1.14.3 diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 5ab08e5..09eb536 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.reconciler.JobReconciler; import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.utils.IngressUtils; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -77,6 +78,12 @@ public class FlinkDeploymentController public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName()); FlinkUtils.deleteCluster(flinkApp, kubernetesClient); + IngressUtils.updateIngressRules( + flinkApp, + FlinkUtils.getEffectiveConfig(flinkApp), + operatorNamespace, + kubernetesClient, + true); return DeleteControl.defaultDelete(); } @@ -89,7 +96,7 @@ public class FlinkDeploymentController boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig); if (success) { try { - success = reconcileFlinkDeployment(flinkApp, effectiveConfig); + success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig); } catch (Exception e) { throw new RuntimeException( "Error while reconciling deployment change for " @@ -109,10 +116,11 @@ public class FlinkDeploymentController } private boolean reconcileFlinkDeployment( - FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception { + String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) + throws Exception { return flinkApp.getSpec().getJob() == null - ? sessionReconciler.reconcile(flinkApp, effectiveConfig) - : jobReconciler.reconcile(flinkApp, effectiveConfig); + ? sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig) + : jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java index 61468e4..564efc9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java @@ -26,7 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.kubernetes.operator.utils.KubernetesUtils; +import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import io.fabric8.kubernetes.client.KubernetesClient; @@ -51,7 +51,8 @@ public class JobReconciler { this.flinkService = flinkService; } - public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig) + public boolean reconcile( + String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception { JobSpec jobSpec = flinkApp.getSpec().getJob(); @@ -65,7 +66,8 @@ public class JobReconciler { flinkApp, effectiveConfig, Optional.ofNullable(jobSpec.getInitialSavepointPath())); - KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); + IngressUtils.updateIngressRules( + flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); return true; } catch (Exception e) { LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java index 374ee24..0c7bae5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java @@ -21,7 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.service.FlinkService; -import org.apache.flink.kubernetes.operator.utils.KubernetesUtils; +import org.apache.flink.kubernetes.operator.utils.IngressUtils; import io.fabric8.kubernetes.client.KubernetesClient; import org.slf4j.Logger; @@ -43,13 +43,15 @@ public class SessionReconciler { this.flinkService = flinkService; } - public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig) + public boolean reconcile( + String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception { if (flinkApp.getStatus() == null) { flinkApp.setStatus(new FlinkDeploymentStatus()); try { flinkService.submitSessionCluster(flinkApp, effectiveConfig); - KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient); + IngressUtils.updateIngressRules( + flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false); return true; } catch (Exception e) { LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java new file mode 100644 index 0000000..a412592 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; +import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +/** Ingress utilities. */ +public class IngressUtils { + + private static final String INGRESS_NAME = "flink-operator"; + private static final String REST_SVC_NAME_SUFFIX = "-rest"; + + private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class); + + public static void updateIngressRules( + FlinkDeployment flinkDeployment, + Configuration effectiveConfig, + String operatorNamespace, + KubernetesClient client, + boolean remove) { + if (flinkDeployment.getSpec().getIngressDomain() != null) { + final IngressRule ingressRule = fromDeployment(flinkDeployment, effectiveConfig); + getIngress(operatorNamespace, client) + .ifPresent( + ingress -> { + Ingress updated; + if (remove) { + updated = + new IngressBuilder(ingress) + .editSpec() + .removeFromRules(ingressRule) + .endSpec() + .build(); + } else { + updated = + new IngressBuilder(ingress) + .editSpec() + .addToRules(ingressRule) + .endSpec() + .build(); + } + LOG.info("Updating ingress rules {}", ingress); + client.resourceList(updated) + .inNamespace(operatorNamespace) + .createOrReplace(); + }); + } + } + + private static Optional<Ingress> getIngress(String operatorNamespace, KubernetesClient client) { + return Optional.ofNullable( + client.network() + .v1() + .ingresses() + .inNamespace(operatorNamespace) + .withName(INGRESS_NAME) + .get()); + } + + private static IngressRule fromDeployment( + FlinkDeployment flinkDeployment, Configuration effectiveConfig) { + final String clusterId = flinkDeployment.getMetadata().getName(); + final int restPort = effectiveConfig.getInteger(RestOptions.PORT); + final String ingressHost = getIngressHost(flinkDeployment, clusterId); + return new IngressRule( + ingressHost, + new HTTPIngressRuleValueBuilder() + .addNewPath() + .withPathType("ImplementationSpecific") + .withNewBackend() + .withNewService() + .withName(clusterId + REST_SVC_NAME_SUFFIX) + .withNewPort() + .withNumber(restPort) + .endPort() + .endService() + .endBackend() + .endPath() + .build()); + } + + private static String getIngressHost(FlinkDeployment flinkDeployment, String clusterId) { + return String.format("%s.%s", clusterId, flinkDeployment.getSpec().getIngressDomain()); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java deleted file mode 100644 index dd5bcba..0000000 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.utils; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.OwnerReference; -import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder; -import io.fabric8.kubernetes.api.model.networking.v1.Ingress; -import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder; -import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; -import io.fabric8.kubernetes.client.KubernetesClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** Kubernetes related utilities. */ -public class KubernetesUtils { - - public static final String REST_SVC_NAME_SUFFIX = "-rest"; - public static final String INGRESS_API_VERSION = "networking.k8s.io/v1"; - - private static final Logger LOG = LoggerFactory.getLogger(KubernetesUtils.class); - - public static void deployIngress( - FlinkDeployment flinkApp, - Configuration effectiveConfig, - KubernetesClient kubernetesClient) { - if (flinkApp.getSpec().getIngressDomain() != null) { - final List<IngressRule> ingressRules = new ArrayList<>(); - final String clusterId = flinkApp.getMetadata().getName(); - final String namespace = flinkApp.getMetadata().getNamespace(); - final int restPort = effectiveConfig.getInteger(RestOptions.PORT); - final String ingressHost = - String.format("%s.%s", clusterId, flinkApp.getSpec().getIngressDomain()); - ingressRules.add( - new IngressRule( - ingressHost, - new HTTPIngressRuleValueBuilder() - .addNewPath() - .withPathType("ImplementationSpecific") - .withNewBackend() - .withNewService() - .withName(clusterId + REST_SVC_NAME_SUFFIX) - .withNewPort() - .withNumber(restPort) - .endPort() - .endService() - .endBackend() - .endPath() - .build())); - final Ingress ingress = - new IngressBuilder() - .withApiVersion(INGRESS_API_VERSION) - .withNewMetadata() - .withName(clusterId) - .endMetadata() - .withNewSpec() - .withRules(ingressRules) - .endSpec() - .build(); - - Deployment deployment = - kubernetesClient - .apps() - .deployments() - .inNamespace(flinkApp.getMetadata().getNamespace()) - .withName(flinkApp.getMetadata().getName()) - .get(); - KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress)); - LOG.info(ingress.toString()); - kubernetesClient.resourceList(ingress).inNamespace(namespace).createOrReplace(); - } - } - - private static void setOwnerReference(HasMetadata owner, List<HasMetadata> resources) { - final OwnerReference ownerReference = - new OwnerReferenceBuilder() - .withName(owner.getMetadata().getName()) - .withApiVersion(owner.getApiVersion()) - .withUid(owner.getMetadata().getUid()) - .withKind(owner.getKind()) - .withController(true) - .withBlockOwnerDeletion(true) - .build(); - resources.forEach( - resource -> - resource.getMetadata() - .setOwnerReferences(Collections.singletonList(ownerReference))); - } -} diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/templates/ingress.yaml similarity index 69% copy from helm/flink-operator/values.yaml copy to helm/flink-operator/templates/ingress.yaml index dd083e7..ddd990f 100644 --- a/helm/flink-operator/values.yaml +++ b/helm/flink-operator/templates/ingress.yaml @@ -15,33 +15,19 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - --- - -operatorNamespace: - name: default - -image: - repository: flink-operator - pullPolicy: IfNotPresent - tag: latest - -rbac: - create: true - -serviceAccount: - create: true - annotations: {} - name: "flink-operator" - -webhook: - create: true - keystore: - useDefaultPassword: true - # passwordSecretRef: - # name: jks-password-secret - # key: password-key - -imagePullSecrets: [] -nameOverride: "" -fullnameOverride: "" +{{- if .Values.ingress.create }} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "flink-operator.name" . }} + namespace: {{ .Values.operatorNamespace.name }} + labels: + {{- include "flink-operator.labels" . | nindent 4 }} +spec: + defaultBackend: + service: + name: dummy-http-backend + port: + number: 80 +{{- end }} diff --git a/helm/flink-operator/templates/rbac.yaml b/helm/flink-operator/templates/rbac.yaml index 95d23ef..36b92a1 100644 --- a/helm/flink-operator/templates/rbac.yaml +++ b/helm/flink-operator/templates/rbac.yaml @@ -43,6 +43,7 @@ rules: - events - configmaps - secrets + - nodes verbs: - "*" - apiGroups: diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml index dd083e7..f65fd18 100644 --- a/helm/flink-operator/values.yaml +++ b/helm/flink-operator/values.yaml @@ -29,6 +29,9 @@ image: rbac: create: true +ingress: + create: false + serviceAccount: create: true annotations: {}