This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 7c555d2b4c627b1da3f0cdb33ef18e3070fa18b8 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Jan 20 12:41:26 2022 +0100 Initial project setup --- .gitignore | 36 ++++ Dockerfile | 5 + LICENSE | 201 +++++++++++++++++++++ README.md | 47 ++++- deploy/flink-operator.yaml | 175 ++++++++++++++++++ examples/cr.yaml | 21 +++ pom.xml | 148 +++++++++++++++ .../operator/KubernetesOperatorEntrypoint.java | 42 +++++ .../flink/kubernetes/operator/Utils/Constants.java | 11 ++ .../kubernetes/operator/Utils/FlinkUtils.java | 99 ++++++++++ .../kubernetes/operator/Utils/KubernetesUtils.java | 23 +++ .../controller/FlinkApplicationController.java | 169 +++++++++++++++++ .../kubernetes/operator/crd/FlinkApplication.java | 17 ++ .../operator/crd/FlinkApplicationList.java | 6 + .../operator/crd/spec/FlinkApplicationSpec.java | 30 +++ .../kubernetes/operator/crd/spec/Resource.java | 12 ++ .../crd/status/FlinkApplicationStatus.java | 10 + .../kubernetes/operator/crd/status/JobStatus.java | 15 ++ src/main/resources/log4j2.properties | 8 + 19 files changed, 1074 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ceb6113 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +.idea diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..89287a1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre + +COPY target/flink-operator-1.0-SNAPSHOT.jar / + +CMD ["java", "-jar", "/flink-operator-1.0-SNAPSHOT.jar"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 4c9483b..845b97d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,47 @@ # flink-kubernetes-operator -Apache Flink Kubernetes Operator +Temporary repository for Flink Kubernetes Operator. The content will be moved to OSS repo once created and IPR. + +## How to Build +``` +mvn clean install +``` + +## How to Run +* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. The CRD could be find [here](deploy/crd.yaml). If not, issue the following commands to apply: +``` +kubectl apply -f deploy/crd.yaml +``` +* Build Docker Image +``` +docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest +``` +* Start flink-operator deployment +A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. +``` +kubectl apply -f deploy/flink-operator.yaml +``` +* Create a new Flink application +The flink-operator will watch the CRD resources and submit a new Flink application once the CR it applied. +``` +kubectl apply -f deploy/cr.yaml +``` + +* Delete a Flink application +``` +kubectl delete -f deploy/cr.yaml + +OR + +kubectl delete flinkapp {app_name} +``` + +* Get/List Flink applications +Get all the Flink applications running in the K8s cluster +``` +kubectl get flinkapp +``` + +Describe a specific Flink application to show the status(including job status, savepoint, ect.) +``` +kubectl describe flinkapp {app_name} +``` diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml new file mode 100644 index 0000000..564ed12 --- /dev/null +++ b/deploy/flink-operator.yaml @@ -0,0 +1,175 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: flink-operator +spec: + replicas: 1 + selector: + matchLabels: + app: flink-operator + template: + metadata: + labels: + app: flink-operator + spec: + serviceAccountName: flink-operator + containers: + - name: flink-operator + image: docker.apple.com/gyula_fora/flink-java-operator:latest + imagePullPolicy: Always + env: + - name: FLINK_CONF_DIR + value: /opt/flink/conf + volumeMounts: + - name: flink-config-volume + mountPath: /opt/flink/conf + volumes: + - name: flink-config-volume + configMap: + name: flink-config + items: + - key: flink-conf.yaml + path: flink-conf.yaml + - key: log4j-console.properties + path: log4j-console.properties + +--- + +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-config + labels: + app: flink +data: + flink-conf.yaml: |+ + jobmanager.rpc.address: flink-jobmanager + taskmanager.numberOfTaskSlots: 2 + blob.server.port: 6124 + jobmanager.rpc.port: 6123 + taskmanager.rpc.port: 6122 + queryable-state.proxy.ports: 6125 + jobmanager.memory.process.size: 1600m + taskmanager.memory.process.size: 1728m + parallelism.default: 2 + log4j-console.properties: |+ + # This affects logging for both user code and Flink + rootLogger.level = INFO + rootLogger.appenderRef.console.ref = ConsoleAppender + rootLogger.appenderRef.rolling.ref = RollingFileAppender + + # Uncomment this if you want to _only_ change Flink's logging + #logger.flink.name = org.apache.flink + #logger.flink.level = INFO + + # The following lines keep the log level of common libraries/connectors on + # log level INFO. The root logger does not override this. You have to manually + # change the log levels here. + logger.akka.name = akka + logger.akka.level = INFO + logger.kafka.name= org.apache.kafka + logger.kafka.level = INFO + logger.hadoop.name = org.apache.hadoop + logger.hadoop.level = INFO + logger.zookeeper.name = org.apache.zookeeper + logger.zookeeper.level = INFO + + # Log all infos to the console + appender.console.name = ConsoleAppender + appender.console.type = CONSOLE + appender.console.layout.type = PatternLayout + appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + + # Log all infos in the given rolling file + appender.rolling.name = RollingFileAppender + appender.rolling.type = RollingFile + appender.rolling.append = false + appender.rolling.fileName = ${sys:log.file} + appender.rolling.filePattern = ${sys:log.file}.%i + appender.rolling.layout.type = PatternLayout + appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + appender.rolling.policies.type = Policies + appender.rolling.policies.size.type = SizeBasedTriggeringPolicy + appender.rolling.policies.size.size=100MB + appender.rolling.strategy.type = DefaultRolloverStrategy + appender.rolling.strategy.max = 10 + + # Suppress the irrelevant (wrong) warnings from the Netty channel handler + logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline + logger.netty.level = OFF + +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: flink-operator + +--- + +apiVersion: v1 +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: flink-operator +rules: +- apiGroups: + - flink-operator + resources: + - "*" + verbs: + - "*" +- apiGroups: + - "" + resources: + - pods + - services + - endpoints + - persistentvolumeclaims + - events + - configmaps + - secrets + verbs: + - "*" +- apiGroups: + - apps + resources: + - deployments + - replicasets + verbs: + - "*" +- apiGroups: + - extensions + resources: + - deployments + - ingresses + verbs: + - "*" +- apiGroups: + - flink.io + resources: + - flinkapplications + verbs: + - "*" +- apiGroups: + - networking.k8s.io + resources: + - ingresses + verbs: + - "*" + +--- + +apiVersion: v1 +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: flink-operator-cluster-role-binding +subjects: +- kind: ServiceAccount + name: flink-operator + namespace: default +roleRef: + kind: ClusterRole + name: flink-operator + apiGroup: rbac.authorization.k8s.io diff --git a/examples/cr.yaml b/examples/cr.yaml new file mode 100644 index 0000000..9695be0 --- /dev/null +++ b/examples/cr.yaml @@ -0,0 +1,21 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkApplication +metadata: + namespace: default + name: flink-example-statemachine +spec: + imageName: flink:latest + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 1 + jobManagerResource: + mem: 2048m + cpu: 1 + taskManagerResource: + mem: 2048m + cpu: 1 + savepointsDir: file:///tmp/savepoints + savepointGeneration: 0 + flinkConfig: + taskmanager.numberOfTaskSlots: 2 + kubernetes.jobmanager.service-account: flink-operator + kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..029a069 --- /dev/null +++ b/pom.xml @@ -0,0 +1,148 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.flink</groupId> + <artifactId>flink-operator</artifactId> + <version>1.0-SNAPSHOT</version> + <packaging>jar</packaging> + <properties> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version> + <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version> + + <operator.sdk.version>2.0.1</operator.sdk.version> + <fabric8.version>5.11.2</fabric8.version> + <lombok.version>1.18.10</lombok.version> + + <scala.version>2.12</scala.version> + <flink.version>1.14.3</flink.version> + + <slf4j.version>1.7.15</slf4j.version> + <log4j.version>2.17.0</log4j.version> + </properties> + + <dependencies> + <dependency> + <groupId>io.javaoperatorsdk</groupId> + <artifactId>operator-framework</artifactId> + <version>${operator.sdk.version}</version> + </dependency> + + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>crd-generator-apt</artifactId> + <version>${fabric8.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.takes</groupId> + <artifactId>takes</artifactId> + <version>1.19</version> + </dependency> + + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-server-mock</artifactId> + <version>${fabric8.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-kubernetes_${scala.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + + <dependency> + <!-- API bridge between log4j 1 and 2 --> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + <version>${log4j.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink-operator</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration combine.children="append"> + <artifactSet> + <includes combine.children="append"> + <include>*:*</include> + </includes> + </artifactSet> + <transformers combine.children="append"> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.kubernetes.operator.KubernetesOperatorEntrypoint</mainClass> + </transformer> + <!-- The service transformer is needed to merge META-INF/services files --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <!-- The ApacheNoticeResourceTransformer collects and aggregates NOTICE files --> + <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> + <projectName>Apache Flink</projectName> + <encoding>UTF-8</encoding> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + </plugin> + </plugins> + </build> + +</project> diff --git a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java b/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java new file mode 100644 index 0000000..b7f705f --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java @@ -0,0 +1,42 @@ +package org.apache.flink.kubernetes.operator; + +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.Operator; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; +import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService; +import org.apache.flink.kubernetes.operator.controller.FlinkApplicationController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.takes.facets.fork.FkRegex; +import org.takes.facets.fork.TkFork; +import org.takes.http.Exit; +import org.takes.http.FtBasic; + +import java.io.IOException; + +/** + * Main Class for Flink native k8s operator. + */ +public class KubernetesOperatorEntrypoint { + private static final Logger LOG = LoggerFactory.getLogger(KubernetesOperatorEntrypoint.class); + + public static void main(String args[]) throws IOException { + + LOG.info("Starting Flink Kubernetes Operator"); + + KubernetesClient client = new DefaultKubernetesClient(); + String namespace = client.getNamespace(); + if (namespace == null) { + namespace = "default"; + } + Operator operator = new Operator(client, + new ConfigurationServiceOverrider(DefaultConfigurationService.instance()) + .build()); + operator.register(new FlinkApplicationController(client, namespace)); + operator.installShutdownHook(); + operator.start(); + + new FtBasic(new TkFork(new FkRegex("/health", "ALL GOOD!")), 8080).start(Exit.NEVER); + } +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/Utils/Constants.java b/src/main/java/org/apache/flink/kubernetes/operator/Utils/Constants.java new file mode 100644 index 0000000..a117011 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/Utils/Constants.java @@ -0,0 +1,11 @@ +package org.apache.flink.kubernetes.operator.Utils; + +public class Constants { + public static final String FLINK_NATIVE_K8S_OPERATOR_NAME = "flink-operator"; + public static final String KUBERNETES_APP_TARGET = "kubernetes-application"; + + public static final String REST_SVC_NAME_SUFFIX = "-rest"; + + public static final String INGRESS_API_VERSION = "networking.k8s.io/v1"; + public static final String INGRESS_SUFFIX = ".flink.io"; +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java new file mode 100644 index 0000000..4a50a64 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java @@ -0,0 +1,99 @@ +package org.apache.flink.kubernetes.operator.Utils; + +import org.apache.flink.client.deployment.StandaloneClientFactory; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.util.StringUtils; + +import java.net.URI; +import java.util.Collections; + +public class FlinkUtils { + + public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkApplicationSpec spec) throws Exception { + final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR); + final Configuration effectiveConfig; + if (flinkConfDir != null) { + effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir); + } else { + effectiveConfig = new Configuration(); + } + + // Basic config options + final URI uri = new URI(spec.getJarURI()); + effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace); + effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); + effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET); + // Set rest service exposed type to clusterIP since we will use ingress to access the webui + effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); + + // Image + if (!StringUtils.isNullOrWhitespaceOnly(spec.getImageName())) { + effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImageName()); + } + if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) { + effectiveConfig.set( + KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, + KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy())); + } + + // Jars + effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString())); + + // Parallelism and Resource + if (spec.getParallelism() > 0) { + effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getParallelism()); + } + if (spec.getJobManagerResource() != null) { + effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManagerResource().getMem()); + effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManagerResource().getCpu()); + } + if (spec.getTaskManagerResource() != null) { + effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManagerResource().getMem()); + effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManagerResource().getCpu()); + } + + // Savepoint + if (!StringUtils.isNullOrWhitespaceOnly(spec.getFromSavepoint())) { + effectiveConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, spec.getFromSavepoint()); + effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, spec.isAllowNonRestoredState()); + } + if (!StringUtils.isNullOrWhitespaceOnly(spec.getSavepointsDir())) { + effectiveConfig.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, spec.getSavepointsDir()); + } + + // Dynamic configuration + if (spec.getFlinkConfig() != null && !spec.getFlinkConfig().isEmpty()) { + spec.getFlinkConfig().forEach(effectiveConfig::setString); + } + + return effectiveConfig; + } + + + public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception { + final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID); + final String namespace = config.get(KubernetesConfigOptions.NAMESPACE); + final int port = config.getInteger(RestOptions.PORT); + final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port); + return new RestClusterClient<>( + config, + clusterId, + (c,e) -> new StandaloneClientHAServices(restServerAddress)); + } +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/Utils/KubernetesUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/Utils/KubernetesUtils.java new file mode 100644 index 0000000..323eb80 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/Utils/KubernetesUtils.java @@ -0,0 +1,23 @@ +package org.apache.flink.kubernetes.operator.Utils; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.OwnerReference; +import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; + +import java.util.Collections; +import java.util.List; + +public class KubernetesUtils { + public static void setOwnerReference(HasMetadata owner, List<HasMetadata> resources) { + final OwnerReference ownerReference = new OwnerReferenceBuilder() + .withName(owner.getMetadata().getName()) + .withApiVersion(owner.getApiVersion()) + .withUid(owner.getMetadata().getUid()) + .withKind(owner.getKind()) + .withController(true) + .withBlockOwnerDeletion(true) + .build(); + resources.forEach(resource -> + resource.getMetadata().setOwnerReferences(Collections.singletonList(ownerReference))); + } +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java new file mode 100644 index 0000000..54e47f0 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java @@ -0,0 +1,169 @@ +package org.apache.flink.kubernetes.operator.controller; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; +import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder; +import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; +import io.fabric8.kubernetes.client.KubernetesClient; + +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.ApplicationDeployer; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.operator.Utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.Utils.Constants; +import org.apache.flink.kubernetes.operator.Utils.KubernetesUtils; +import org.apache.flink.kubernetes.operator.crd.FlinkApplication; +import org.apache.flink.kubernetes.operator.crd.spec.Resource; +import org.apache.flink.runtime.client.JobStatusMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.kubernetes.operator.Utils.Constants.FLINK_NATIVE_K8S_OPERATOR_NAME; + +@ControllerConfiguration +public class FlinkApplicationController implements Reconciler<FlinkApplication>, ErrorStatusHandler<FlinkApplication>, EventSourceInitializer<FlinkApplication> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkApplicationController.class); + private static final int POLL_PERIOD = 3000; + + private final KubernetesClient kubernetesClient; + + private final Map<String, Tuple2<FlinkApplication, Configuration>> flinkApps; + private final Map<String, String> savepointLocation; + + private final String operatorNamespace; + + public FlinkApplicationController(KubernetesClient kubernetesClient, String namespace) { + this.kubernetesClient = kubernetesClient; + this.operatorNamespace = namespace; + + this.flinkApps = new ConcurrentHashMap<>(); + this.savepointLocation = new HashMap<>(); + } + + @Override + public DeleteControl cleanup(FlinkApplication flinkApp, Context context) { + LOG.info("Cleaning up application {}", flinkApp); + kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName(flinkApp.getMetadata().getName()).cascading(true).delete(); + return DeleteControl.defaultDelete(); + } + + @Override + public UpdateControl<FlinkApplication> reconcile(FlinkApplication flinkApp, Context context) { + LOG.info("Reconciling application {}", flinkApp); + final String namespace = flinkApp.getMetadata().getNamespace(); + final String clusterId = flinkApp.getMetadata().getName(); + final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get(); + + final Configuration effectiveConfig; + try { + effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec()); + } catch (Exception e) { + LOG.error("Failed to load configuration", e); + throw new RuntimeException("Failed to load configuration", e); + } + + // Create new Flink application + if (!flinkApps.containsKey(clusterId) && deployment == null) { + // Deploy application + final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader(); + final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader); + + final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(flinkApp.getSpec().getMainArgs(), flinkApp.getSpec().getEntryClass()); + try { + deployer.run(effectiveConfig, applicationConfiguration); + } catch (Exception e) { + LOG.error("Failed to deploy cluster {}", clusterId, e); + } + + flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig)); + + updateIngress(); + } else { + if (!flinkApps.containsKey(clusterId)) { + LOG.info("Recovering {}", clusterId); + flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig)); + return UpdateControl.noUpdate(); + } + // Flink app is deleted externally + if (deployment == null) { + LOG.warn("{} is delete externally.", clusterId); + flinkApps.remove(clusterId); + return UpdateControl.noUpdate(); + } + + FlinkApplication oldFlinkApp = flinkApps.get(clusterId).f0; + + // Trigger a new savepoint + triggerSavepoint(oldFlinkApp, flinkApp, effectiveConfig); + + // TODO support more fields updating, e.g. image, resources + } + return UpdateControl.updateResource(flinkApp); + } + + @Override + public List<EventSource> prepareEventSources(EventSourceContext<FlinkApplication> eventSourceContext) { + // TODO: start status updated +// return List.of(new PerResourcePollingEventSource<>( +// new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD, +// FlinkApplication.class)); + return Collections.emptyList(); + } + + @Override + public Optional<FlinkApplication> updateErrorStatus(FlinkApplication flinkApplication, RetryInfo retryInfo, RuntimeException e) { + //TODO: Set error status + return Optional.empty(); + } + + private void updateIngress() { + final List<IngressRule> ingressRules = new ArrayList<>(); + for (Tuple2<FlinkApplication, Configuration> entry : flinkApps.values()) { + final FlinkApplication flinkApp = entry.f0; + final String clusterId = flinkApp.getMetadata().getName(); + final int restPort = entry.f1.getInteger(RestOptions.PORT); + + final String ingressHost = clusterId + Constants.INGRESS_SUFFIX; + ingressRules.add(new IngressRule(ingressHost, new HTTPIngressRuleValueBuilder().addNewPath().withNewBackend().withNewService().withName(clusterId + Constants.REST_SVC_NAME_SUFFIX).withNewPort(null, restPort).endService().endBackend().withPathType("Prefix").withPath("/").endPath().build())); + } + final Ingress ingress = new IngressBuilder().withApiVersion(Constants.INGRESS_API_VERSION).withNewMetadata().withName(FLINK_NATIVE_K8S_OPERATOR_NAME).endMetadata().withNewSpec().withRules(ingressRules).endSpec().build(); + // Get operator deploy + final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(operatorNamespace).withName(FLINK_NATIVE_K8S_OPERATOR_NAME).get(); + if (deployment == null) { + LOG.warn("Could not find deployment {}", FLINK_NATIVE_K8S_OPERATOR_NAME); + } else { + KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress)); + } + kubernetesClient.resourceList(ingress).inNamespace(operatorNamespace).createOrReplace(); + } + + private void triggerSavepoint(FlinkApplication oldFlinkApp, FlinkApplication newFlinkApp, Configuration effectiveConfig) { + final int generation = newFlinkApp.getSpec().getSavepointGeneration(); + if (generation > oldFlinkApp.getSpec().getSavepointGeneration()) { + try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) { + final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs(); + jobDetailsFuture.get().forEach(status -> { + LOG.debug("JobStatus for {}: {}", clusterClient.getClusterId(), status); + clusterClient.triggerSavepoint(status.getJobId(), null).thenAccept(path -> savepointLocation.put(status.getJobId().toString(), path)).join(); + }); + } catch (Exception e) { + LOG.warn("Failed to trigger a new savepoint with generation {}", generation); + } + } + } +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java new file mode 100644 index 0000000..7b37869 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java @@ -0,0 +1,17 @@ +package org.apache.flink.kubernetes.operator.crd; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.Version; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec; +import org.apache.flink.kubernetes.operator.crd.status.FlinkApplicationStatus; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonDeserialize() +@Group("flink.io") +@Version("v1alpha1") +public class FlinkApplication extends CustomResource<FlinkApplicationSpec, FlinkApplicationStatus> implements Namespaced { +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java new file mode 100644 index 0000000..1f0115a --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java @@ -0,0 +1,6 @@ +package org.apache.flink.kubernetes.operator.crd; + +import io.fabric8.kubernetes.client.CustomResourceList; + +public class FlinkApplicationList extends CustomResourceList<FlinkApplication> { +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java new file mode 100644 index 0000000..b5fc82c --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java @@ -0,0 +1,30 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import io.fabric8.kubernetes.api.model.KubernetesResource; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@NoArgsConstructor +public class FlinkApplicationSpec { + private String imageName; + private String imagePullPolicy; + + private String jarURI; + private String[] mainArgs = new String[0]; + private String entryClass; + + private int parallelism; + + private Resource jobManagerResource; + private Resource taskManagerResource; + + private String fromSavepoint; + private boolean allowNonRestoredState = false; + private String savepointsDir; + private int savepointGeneration; + + private Map<String, String> flinkConfig; +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java new file mode 100644 index 0000000..e718860 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java @@ -0,0 +1,12 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import io.fabric8.kubernetes.api.model.KubernetesResource; +import lombok.*; + +@Data +@NoArgsConstructor +public class Resource implements KubernetesResource { + private double cpu; + // 1024m, 1g + private String mem; +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java new file mode 100644 index 0000000..4b27f1b --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java @@ -0,0 +1,10 @@ +package org.apache.flink.kubernetes.operator.crd.status; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class FlinkApplicationStatus { + private JobStatus[] jobStatuses; +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java new file mode 100644 index 0000000..e0265eb --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java @@ -0,0 +1,15 @@ +package org.apache.flink.kubernetes.operator.crd.status; + +import io.fabric8.kubernetes.api.model.KubernetesResource; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class JobStatus implements KubernetesResource { + private String jobName; + private String jobId; + private String state; + private String updateTime; + private String savepointLocation; +} diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..97e583e --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,8 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n