This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit f3d12d6535b93a4002ba30fdb00573276d191fad Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Thu Jan 27 20:02:00 2022 +0100 CRD alignment + first working flink deployment --- README.md | 10 +- deploy/flink-operator.yaml | 82 +--------- deploy/rbac.yaml | 76 +++++++++ examples/basic.yaml | 27 +++ examples/cr.yaml | 21 --- examples/pod-template.yaml | 49 ++++++ pom.xml | 2 +- ...sOperatorEntrypoint.java => FlinkOperator.java} | 8 +- .../kubernetes/operator/Utils/FlinkUtils.java | 181 ++++++++++++--------- .../controller/FlinkApplicationController.java | 169 ------------------- .../controller/FlinkDeploymentController.java | 123 ++++++++++++++ ...{FlinkApplication.java => FlinkDeployment.java} | 6 +- ...plicationList.java => FlinkDeploymentList.java} | 2 +- .../kubernetes/operator/crd/spec/CancelMode.java | 10 ++ .../operator/crd/spec/FlinkApplicationSpec.java | 30 ---- .../operator/crd/spec/FlinkDeploymentSpec.java | 22 +++ .../operator/crd/spec/JobManagerSpec.java | 14 ++ .../kubernetes/operator/crd/spec/JobSpec.java | 16 ++ .../kubernetes/operator/crd/spec/Resource.java | 4 +- .../kubernetes/operator/crd/spec/RestoreMode.java | 12 ++ .../operator/crd/spec/TaskManagerSpec.java | 14 ++ ...ationStatus.java => FlinkDeploymentStatus.java} | 2 +- src/main/resources/log4j2.properties | 2 +- 23 files changed, 489 insertions(+), 393 deletions(-) diff --git a/README.md b/README.md index 845b97d..b0b58f9 100644 --- a/README.md +++ b/README.md @@ -7,17 +7,17 @@ mvn clean install ``` ## How to Run -* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. The CRD could be find [here](deploy/crd.yaml). If not, issue the following commands to apply: +* Make Sure that FlinkApplication Custom Resource Definition is already applied onto the cluster. If not, issue the following commands to apply: ``` -kubectl apply -f deploy/crd.yaml +k apply -f target/classes/META-INF/fabric8/flinkapplications.flink.io-v1.yml ``` -* Build Docker Image +* (Optional) Build Docker Image ``` docker build . -t docker.apple.com/gyula_fora/flink-java-operator:latest ``` -* Start flink-operator deployment -A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. +* Start flink-operator deployment. A new `ServiceAccount` "flink-operator" will be created with enough permission to create/list pods and services. ``` +kubectl apply -f deploy/rbac.yaml kubectl apply -f deploy/flink-operator.yaml ``` * Create a new Flink application diff --git a/deploy/flink-operator.yaml b/deploy/flink-operator.yaml index 564ed12..c2698c7 100644 --- a/deploy/flink-operator.yaml +++ b/deploy/flink-operator.yaml @@ -15,7 +15,7 @@ spec: serviceAccountName: flink-operator containers: - name: flink-operator - image: docker.apple.com/gyula_fora/flink-java-operator:latest + image: docker.apple.com/matyas_orhidi/flink-java-operator:latest imagePullPolicy: Always env: - name: FLINK_CONF_DIR @@ -26,7 +26,7 @@ spec: volumes: - name: flink-config-volume configMap: - name: flink-config + name: flink-operator-config items: - key: flink-conf.yaml path: flink-conf.yaml @@ -38,7 +38,7 @@ spec: apiVersion: v1 kind: ConfigMap metadata: - name: flink-config + name: flink-operator-config labels: app: flink data: @@ -97,79 +97,3 @@ data: # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF - ---- - -apiVersion: v1 -kind: ServiceAccount -metadata: - name: flink-operator - ---- - -apiVersion: v1 -kind: ClusterRole -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: flink-operator -rules: -- apiGroups: - - flink-operator - resources: - - "*" - verbs: - - "*" -- apiGroups: - - "" - resources: - - pods - - services - - endpoints - - persistentvolumeclaims - - events - - configmaps - - secrets - verbs: - - "*" -- apiGroups: - - apps - resources: - - deployments - - replicasets - verbs: - - "*" -- apiGroups: - - extensions - resources: - - deployments - - ingresses - verbs: - - "*" -- apiGroups: - - flink.io - resources: - - flinkapplications - verbs: - - "*" -- apiGroups: - - networking.k8s.io - resources: - - ingresses - verbs: - - "*" - ---- - -apiVersion: v1 -kind: ClusterRoleBinding -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: flink-operator-cluster-role-binding -subjects: -- kind: ServiceAccount - name: flink-operator - namespace: default -roleRef: - kind: ClusterRole - name: flink-operator - apiGroup: rbac.authorization.k8s.io diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml new file mode 100644 index 0000000..062ecab --- /dev/null +++ b/deploy/rbac.yaml @@ -0,0 +1,76 @@ +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: flink-operator + +--- + +apiVersion: v1 +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: flink-operator +rules: +- apiGroups: + - flink-operator + resources: + - "*" + verbs: + - "*" +- apiGroups: + - "" + resources: + - pods + - services + - endpoints + - persistentvolumeclaims + - events + - configmaps + - secrets + verbs: + - "*" +- apiGroups: + - apps + resources: + - deployments + - replicasets + verbs: + - "*" +- apiGroups: + - extensions + resources: + - deployments + - ingresses + verbs: + - "*" +- apiGroups: + - flink.io + resources: + - flinkdeployments + - flinkdeployments/status + verbs: + - "*" +- apiGroups: + - networking.k8s.io + resources: + - ingresses + verbs: + - "*" + +--- + +apiVersion: v1 +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: flink-operator-cluster-role-binding +subjects: +- kind: ServiceAccount + name: flink-operator + namespace: default +roleRef: + kind: ClusterRole + name: flink-operator + apiGroup: rbac.authorization.k8s.io diff --git a/examples/basic.yaml b/examples/basic.yaml new file mode 100644 index 0000000..9fdc289 --- /dev/null +++ b/examples/basic.yaml @@ -0,0 +1,27 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: basic-example +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + kubernetes.jobmanager.service-account: flink-operator + kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" + jobManager: + replicas: 1 + resource: + memory: "2048m" + cpu: 1 + taskManager: + taskSlots: 2 + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 2 + cancelMode: none + restoreMode: none diff --git a/examples/cr.yaml b/examples/cr.yaml deleted file mode 100644 index 9695be0..0000000 --- a/examples/cr.yaml +++ /dev/null @@ -1,21 +0,0 @@ -apiVersion: flink.io/v1alpha1 -kind: FlinkApplication -metadata: - namespace: default - name: flink-example-statemachine -spec: - imageName: flink:latest - jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar - parallelism: 1 - jobManagerResource: - mem: 2048m - cpu: 1 - taskManagerResource: - mem: 2048m - cpu: 1 - savepointsDir: file:///tmp/savepoints - savepointGeneration: 0 - flinkConfig: - taskmanager.numberOfTaskSlots: 2 - kubernetes.jobmanager.service-account: flink-operator - kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml new file mode 100644 index 0000000..02fea81 --- /dev/null +++ b/examples/pod-template.yaml @@ -0,0 +1,49 @@ +apiVersion: flink.io/v1alpha1 +kind: FlinkDeployment +metadata: + namespace: default + name: pod-template-example +spec: + image: flink:1.14.3 + flinkVersion: 1.14.3 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + kubernetes.jobmanager.service-account: flink-operator + kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" + podTemplate: + apiVersion: v1 + kind: Pod + metadata: + name: pod-template + spec: + containers: + # Do not change the main container name + - name: flink-main-container + volumeMounts: + - mountPath: /opt/flink/log + name: flink-logs + # Sample sidecar container + - name: fluentbit + image: fluent/fluent-bit:1.8.12-debug + command: [ 'sh','-c','/fluent-bit/bin/fluent-bit -i tail -p path=/flink-logs/*.log -p multiline.parser=java -o stdout' ] + volumeMounts: + - mountPath: /flink-logs + name: flink-logs + volumes: + - name: flink-logs + emptyDir: { } + jobManager: + replicas: 1 + resource: + memory: "2048m" + cpu: 1 + taskManager: + taskSlots: 2 + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 2 + cancelMode: none + restoreMode: none diff --git a/pom.xml b/pom.xml index 029a069..647868a 100644 --- a/pom.xml +++ b/pom.xml @@ -123,7 +123,7 @@ </artifactSet> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.kubernetes.operator.KubernetesOperatorEntrypoint</mainClass> + <mainClass>org.apache.flink.kubernetes.operator.FlinkOperator</mainClass> </transformer> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> diff --git a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java similarity index 81% rename from src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java rename to src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index b7f705f..edb5b74 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/KubernetesOperatorEntrypoint.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -5,7 +5,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService; -import org.apache.flink.kubernetes.operator.controller.FlinkApplicationController; +import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.takes.facets.fork.FkRegex; @@ -18,8 +18,8 @@ import java.io.IOException; /** * Main Class for Flink native k8s operator. */ -public class KubernetesOperatorEntrypoint { - private static final Logger LOG = LoggerFactory.getLogger(KubernetesOperatorEntrypoint.class); +public class FlinkOperator { + private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class); public static void main(String args[]) throws IOException { @@ -33,7 +33,7 @@ public class KubernetesOperatorEntrypoint { Operator operator = new Operator(client, new ConfigurationServiceOverrider(DefaultConfigurationService.instance()) .build()); - operator.register(new FlinkApplicationController(client, namespace)); + operator.register(new FlinkDeploymentController(client, namespace)); operator.installShutdownHook(); operator.start(); diff --git a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java index 4a50a64..4a69696 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java @@ -1,9 +1,9 @@ package org.apache.flink.kubernetes.operator.Utils; -import org.apache.flink.client.deployment.StandaloneClientFactory; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.internal.SerializationUtils; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -14,86 +14,115 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.util.StringUtils; +import java.io.File; +import java.io.IOException; import java.net.URI; +import java.nio.file.Files; import java.util.Collections; public class FlinkUtils { - public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkApplicationSpec spec) throws Exception { - final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR); - final Configuration effectiveConfig; - if (flinkConfDir != null) { - effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir); - } else { - effectiveConfig = new Configuration(); - } - - // Basic config options - final URI uri = new URI(spec.getJarURI()); - effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace); - effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); - effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET); - // Set rest service exposed type to clusterIP since we will use ingress to access the webui - effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP); - - // Image - if (!StringUtils.isNullOrWhitespaceOnly(spec.getImageName())) { - effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImageName()); - } - if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) { - effectiveConfig.set( - KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, - KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy())); - } - - // Jars - effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString())); - - // Parallelism and Resource - if (spec.getParallelism() > 0) { - effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getParallelism()); - } - if (spec.getJobManagerResource() != null) { - effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManagerResource().getMem()); - effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManagerResource().getCpu()); - } - if (spec.getTaskManagerResource() != null) { - effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManagerResource().getMem()); - effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManagerResource().getCpu()); - } - - // Savepoint - if (!StringUtils.isNullOrWhitespaceOnly(spec.getFromSavepoint())) { - effectiveConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, spec.getFromSavepoint()); - effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, spec.isAllowNonRestoredState()); - } - if (!StringUtils.isNullOrWhitespaceOnly(spec.getSavepointsDir())) { - effectiveConfig.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, spec.getSavepointsDir()); - } - - // Dynamic configuration - if (spec.getFlinkConfig() != null && !spec.getFlinkConfig().isEmpty()) { - spec.getFlinkConfig().forEach(effectiveConfig::setString); - } - - return effectiveConfig; - } - - - public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception { - final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID); - final String namespace = config.get(KubernetesConfigOptions.NAMESPACE); - final int port = config.getInteger(RestOptions.PORT); - final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port); - return new RestClusterClient<>( - config, - clusterId, - (c,e) -> new StandaloneClientHAServices(restServerAddress)); - } + public static Configuration getEffectiveConfig(String namespace, String clusterId, FlinkDeploymentSpec spec) throws Exception { + final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR); + final Configuration effectiveConfig; + + if (flinkConfDir != null) { + effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir); + } else { + effectiveConfig = new Configuration(); + } + + effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace); + effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId); + effectiveConfig.set(DeploymentOptions.TARGET, Constants.KUBERNETES_APP_TARGET); + + if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) { + effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage()); + } + + if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) { + effectiveConfig.set( + KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, + KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy())); + } + + if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) { + spec.getFlinkConfiguration().forEach(effectiveConfig::setString); + } + + // Pod template + if (spec.getPodTemplate() != null) { + effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE, createTempFile(spec.getPodTemplate())); + } + + if (spec.getJobManager() != null) { + if (spec.getJobManager().getResource() != null) { + effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getJobManager().getResource().getMemory()); + effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, spec.getJobManager().getResource().getCpu()); + } + + if (spec.getJobManager().getPodTemplate() != null) { + effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, createTempFile(spec.getJobManager().getPodTemplate())); + } + } + + if (spec.getTaskManager() != null) { + if (spec.getTaskManager().getTaskSlots() > 0) { + effectiveConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots()); + } + + if (spec.getTaskManager().getResource() != null) { + effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), spec.getTaskManager().getResource().getMemory()); + effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, spec.getTaskManager().getResource().getCpu()); + } + + if (spec.getTaskManager().getPodTemplate() != null) { + effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, createTempFile(spec.getTaskManager().getPodTemplate())); + } + } + + if (spec.getJob() != null) { + final URI uri = new URI(spec.getJob().getJarURI()); + effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString())); + + if (spec.getJob().getParallelism() > 0) { + effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism()); + } + } + + return effectiveConfig; + } + + private static String createTempFile(Pod podTemplate) throws IOException { + File tmp = File.createTempFile("podTemplate_", ".yaml"); + Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes()); + tmp.deleteOnExit(); + return tmp.getAbsolutePath(); + } + + public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception { + final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID); + final String namespace = config.get(KubernetesConfigOptions.NAMESPACE); + final int port = config.getInteger(RestOptions.PORT); + final String restServerAddress = String.format("http://%s-rest.%s:%s", clusterId, namespace, port); + return new RestClusterClient<>( + config, + clusterId, + (c, e) -> new StandaloneClientHAServices(restServerAddress)); + } + + public static JobStatus convert(JobStatusMessage message) { + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobId(message.getJobId().toString()); + jobStatus.setJobName(message.getJobName()); + jobStatus.setState(message.getJobState().toString()); + return jobStatus; + } + } diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java deleted file mode 100644 index 54e47f0..0000000 --- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java +++ /dev/null @@ -1,169 +0,0 @@ -package org.apache.flink.kubernetes.operator.controller; - -import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder; -import io.fabric8.kubernetes.api.model.networking.v1.Ingress; -import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder; -import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; -import io.fabric8.kubernetes.client.KubernetesClient; - -import io.javaoperatorsdk.operator.api.reconciler.*; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.cli.ApplicationDeployer; -import org.apache.flink.client.deployment.ClusterClientServiceLoader; -import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; -import org.apache.flink.client.deployment.application.ApplicationConfiguration; -import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.kubernetes.operator.Utils.FlinkUtils; -import org.apache.flink.kubernetes.operator.Utils.Constants; -import org.apache.flink.kubernetes.operator.Utils.KubernetesUtils; -import org.apache.flink.kubernetes.operator.crd.FlinkApplication; -import org.apache.flink.kubernetes.operator.crd.spec.Resource; -import org.apache.flink.runtime.client.JobStatusMessage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.flink.kubernetes.operator.Utils.Constants.FLINK_NATIVE_K8S_OPERATOR_NAME; - -@ControllerConfiguration -public class FlinkApplicationController implements Reconciler<FlinkApplication>, ErrorStatusHandler<FlinkApplication>, EventSourceInitializer<FlinkApplication> { - private static final Logger LOG = LoggerFactory.getLogger(FlinkApplicationController.class); - private static final int POLL_PERIOD = 3000; - - private final KubernetesClient kubernetesClient; - - private final Map<String, Tuple2<FlinkApplication, Configuration>> flinkApps; - private final Map<String, String> savepointLocation; - - private final String operatorNamespace; - - public FlinkApplicationController(KubernetesClient kubernetesClient, String namespace) { - this.kubernetesClient = kubernetesClient; - this.operatorNamespace = namespace; - - this.flinkApps = new ConcurrentHashMap<>(); - this.savepointLocation = new HashMap<>(); - } - - @Override - public DeleteControl cleanup(FlinkApplication flinkApp, Context context) { - LOG.info("Cleaning up application {}", flinkApp); - kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName(flinkApp.getMetadata().getName()).cascading(true).delete(); - return DeleteControl.defaultDelete(); - } - - @Override - public UpdateControl<FlinkApplication> reconcile(FlinkApplication flinkApp, Context context) { - LOG.info("Reconciling application {}", flinkApp); - final String namespace = flinkApp.getMetadata().getNamespace(); - final String clusterId = flinkApp.getMetadata().getName(); - final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get(); - - final Configuration effectiveConfig; - try { - effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec()); - } catch (Exception e) { - LOG.error("Failed to load configuration", e); - throw new RuntimeException("Failed to load configuration", e); - } - - // Create new Flink application - if (!flinkApps.containsKey(clusterId) && deployment == null) { - // Deploy application - final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader(); - final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader); - - final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(flinkApp.getSpec().getMainArgs(), flinkApp.getSpec().getEntryClass()); - try { - deployer.run(effectiveConfig, applicationConfiguration); - } catch (Exception e) { - LOG.error("Failed to deploy cluster {}", clusterId, e); - } - - flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig)); - - updateIngress(); - } else { - if (!flinkApps.containsKey(clusterId)) { - LOG.info("Recovering {}", clusterId); - flinkApps.put(clusterId, new Tuple2<>(flinkApp, effectiveConfig)); - return UpdateControl.noUpdate(); - } - // Flink app is deleted externally - if (deployment == null) { - LOG.warn("{} is delete externally.", clusterId); - flinkApps.remove(clusterId); - return UpdateControl.noUpdate(); - } - - FlinkApplication oldFlinkApp = flinkApps.get(clusterId).f0; - - // Trigger a new savepoint - triggerSavepoint(oldFlinkApp, flinkApp, effectiveConfig); - - // TODO support more fields updating, e.g. image, resources - } - return UpdateControl.updateResource(flinkApp); - } - - @Override - public List<EventSource> prepareEventSources(EventSourceContext<FlinkApplication> eventSourceContext) { - // TODO: start status updated -// return List.of(new PerResourcePollingEventSource<>( -// new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD, -// FlinkApplication.class)); - return Collections.emptyList(); - } - - @Override - public Optional<FlinkApplication> updateErrorStatus(FlinkApplication flinkApplication, RetryInfo retryInfo, RuntimeException e) { - //TODO: Set error status - return Optional.empty(); - } - - private void updateIngress() { - final List<IngressRule> ingressRules = new ArrayList<>(); - for (Tuple2<FlinkApplication, Configuration> entry : flinkApps.values()) { - final FlinkApplication flinkApp = entry.f0; - final String clusterId = flinkApp.getMetadata().getName(); - final int restPort = entry.f1.getInteger(RestOptions.PORT); - - final String ingressHost = clusterId + Constants.INGRESS_SUFFIX; - ingressRules.add(new IngressRule(ingressHost, new HTTPIngressRuleValueBuilder().addNewPath().withNewBackend().withNewService().withName(clusterId + Constants.REST_SVC_NAME_SUFFIX).withNewPort(null, restPort).endService().endBackend().withPathType("Prefix").withPath("/").endPath().build())); - } - final Ingress ingress = new IngressBuilder().withApiVersion(Constants.INGRESS_API_VERSION).withNewMetadata().withName(FLINK_NATIVE_K8S_OPERATOR_NAME).endMetadata().withNewSpec().withRules(ingressRules).endSpec().build(); - // Get operator deploy - final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(operatorNamespace).withName(FLINK_NATIVE_K8S_OPERATOR_NAME).get(); - if (deployment == null) { - LOG.warn("Could not find deployment {}", FLINK_NATIVE_K8S_OPERATOR_NAME); - } else { - KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress)); - } - kubernetesClient.resourceList(ingress).inNamespace(operatorNamespace).createOrReplace(); - } - - private void triggerSavepoint(FlinkApplication oldFlinkApp, FlinkApplication newFlinkApp, Configuration effectiveConfig) { - final int generation = newFlinkApp.getSpec().getSavepointGeneration(); - if (generation > oldFlinkApp.getSpec().getSavepointGeneration()) { - try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) { - final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs(); - jobDetailsFuture.get().forEach(status -> { - LOG.debug("JobStatus for {}: {}", clusterClient.getClusterId(), status); - clusterClient.triggerSavepoint(status.getJobId(), null).thenAccept(path -> savepointLocation.put(status.getJobId().toString(), path)).join(); - }); - } catch (Exception e) { - LOG.warn("Failed to trigger a new savepoint with generation {}", generation); - } - } - } -} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java new file mode 100644 index 0000000..2967dc0 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -0,0 +1,123 @@ +package org.apache.flink.kubernetes.operator.controller; + +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.KubernetesClient; + +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import org.apache.flink.client.cli.ApplicationDeployer; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.Utils.FlinkUtils; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.runtime.client.JobStatusMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +@ControllerConfiguration +public class FlinkDeploymentController implements Reconciler<FlinkDeployment>, ErrorStatusHandler<FlinkDeployment>, EventSourceInitializer<FlinkDeployment> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); + private static final int POLL_PERIOD = 3000; + + private final KubernetesClient kubernetesClient; + + private final Map<String, String> savepointLocation; + + private final String operatorNamespace; + + public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) { + this.kubernetesClient = kubernetesClient; + this.operatorNamespace = namespace; + this.savepointLocation = new HashMap<>(); + } + + @Override + public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { + LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName()); + kubernetesClient.apps().deployments().inNamespace(flinkApp.getMetadata().getNamespace()).withName( + flinkApp.getMetadata().getName()).cascading(true).delete(); + return DeleteControl.defaultDelete(); + } + + @Override + public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) { + LOG.info("Reconciling application cluster {}", flinkApp.getMetadata().getName()); + final String namespace = flinkApp.getMetadata().getNamespace(); + final String clusterId = flinkApp.getMetadata().getName(); + final Deployment deployment = kubernetesClient.apps().deployments().inNamespace(namespace).withName(clusterId).get(); + + final Configuration effectiveConfig; + try { + effectiveConfig = FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec()); + } catch (Exception e) { + LOG.error("Failed to load configuration", e); + throw new RuntimeException("Failed to load configuration", e); + } + if (deployment == null) { + LOG.info("Deploying application cluster {}", flinkApp.getMetadata().getName()); + final ClusterClientServiceLoader clusterClientServiceLoader = new DefaultClusterClientServiceLoader(); + final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader); + + final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration( + flinkApp.getSpec().getJob().getArgs(), + flinkApp.getSpec().getJob().getEntryClass()); + try { + deployer.run(effectiveConfig, applicationConfiguration); + } catch (Exception e) { + LOG.error("Failed to deploy {}", clusterId, e); + } + LOG.info("{} deployed", flinkApp.getMetadata().getName()); + return UpdateControl.<FlinkDeployment>noUpdate().rescheduleAfter(10, TimeUnit.SECONDS); + } else { + LOG.info("Getting job statuses for application cluster {}", flinkApp.getMetadata().getName()); + FlinkDeploymentStatus flinkAppStatus = new FlinkDeploymentStatus(); + try (ClusterClient<String> clusterClient = FlinkUtils.getRestClusterClient(effectiveConfig)) { + final CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs(); + JobStatus[] jobStatuses = jobDetailsFuture.get().stream() + .map(FlinkUtils::convert) + .toArray(size -> new JobStatus[size]); + flinkAppStatus.setJobStatuses(jobStatuses); + flinkApp.setStatus(flinkAppStatus); + LOG.debug(flinkAppStatus.toString()); + if (flinkApp.getStatus().getJobStatuses().length == 0) { + LOG.info("Got no job status for application cluster {} retrying", flinkApp.getMetadata().getName()); + return UpdateControl.<FlinkDeployment>noUpdate().rescheduleAfter(10, TimeUnit.SECONDS); + } else { + LOG.info("Job statuses updated for application cluster {}", flinkApp.getMetadata().getName()); + return UpdateControl.updateStatus(flinkApp); + } + } catch (Exception e) { + LOG.warn("Failed to get the job statuses for application cluster {} giving up", flinkApp, e); + return UpdateControl.noUpdate(); + } + } + } + + @Override + public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> eventSourceContext) { + // TODO: start status updated +// return List.of(new PerResourcePollingEventSource<>( +// new FlinkResourceSupplier, context.getPrimaryCache(), POLL_PERIOD, +// FlinkApplication.class)); + return Collections.emptyList(); + } + + @Override + public Optional<FlinkDeployment> updateErrorStatus(FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) { + LOG.warn("TODO: handle error status"); + return Optional.empty(); + } + + +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java similarity index 63% rename from src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java rename to src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java index 7b37869..2978588 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplication.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java @@ -6,12 +6,12 @@ import io.fabric8.kubernetes.api.model.Namespaced; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.model.annotation.Group; import io.fabric8.kubernetes.model.annotation.Version; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkApplicationSpec; -import org.apache.flink.kubernetes.operator.crd.status.FlinkApplicationStatus; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; @JsonInclude(JsonInclude.Include.NON_NULL) @JsonDeserialize() @Group("flink.io") @Version("v1alpha1") -public class FlinkApplication extends CustomResource<FlinkApplicationSpec, FlinkApplicationStatus> implements Namespaced { +public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus> implements Namespaced { } diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java similarity index 57% rename from src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java rename to src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java index 1f0115a..ef8a03c 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkApplicationList.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeploymentList.java @@ -2,5 +2,5 @@ package org.apache.flink.kubernetes.operator.crd; import io.fabric8.kubernetes.client.CustomResourceList; -public class FlinkApplicationList extends CustomResourceList<FlinkApplication> { +public class FlinkDeploymentList extends CustomResourceList<FlinkDeployment> { } diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java new file mode 100644 index 0000000..d58015e --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/CancelMode.java @@ -0,0 +1,10 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum CancelMode { + @JsonProperty("savepoint") + SAVEPOINT, + @JsonProperty("none") + NONE +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java deleted file mode 100644 index b5fc82c..0000000 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkApplicationSpec.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.apache.flink.kubernetes.operator.crd.spec; - -import io.fabric8.kubernetes.api.model.KubernetesResource; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.Map; - -@Data -@NoArgsConstructor -public class FlinkApplicationSpec { - private String imageName; - private String imagePullPolicy; - - private String jarURI; - private String[] mainArgs = new String[0]; - private String entryClass; - - private int parallelism; - - private Resource jobManagerResource; - private Resource taskManagerResource; - - private String fromSavepoint; - private boolean allowNonRestoredState = false; - private String savepointsDir; - private int savepointGeneration; - - private Map<String, String> flinkConfig; -} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java new file mode 100644 index 0000000..f65e8e8 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java @@ -0,0 +1,22 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import io.fabric8.kubernetes.api.model.Pod; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@NoArgsConstructor +public class FlinkDeploymentSpec { + private String image; + private String imagePullPolicy; + private String flinkVersion; + private Map<String, String> flinkConfiguration; + private Pod podTemplate; + private JobManagerSpec jobManager; + private TaskManagerSpec taskManager; + private JobSpec job; + private Map<String, String> logging; +} + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java new file mode 100644 index 0000000..c11e65f --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java @@ -0,0 +1,14 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import io.fabric8.kubernetes.api.model.Pod; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class JobManagerSpec { + private Resource resource; + private int replicas; + private Pod podTemplate; +} + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java new file mode 100644 index 0000000..dfd8adc --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java @@ -0,0 +1,16 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class JobSpec { + private String jarURI; + private int parallelism; + private String entryClass; + private String[] args = new String[0]; + private RestoreMode restoreMode; + private CancelMode cancelMode; +} + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java index e718860..7b190cc 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java @@ -5,8 +5,8 @@ import lombok.*; @Data @NoArgsConstructor -public class Resource implements KubernetesResource { +public class Resource { private double cpu; // 1024m, 1g - private String mem; + private String memory; } diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java new file mode 100644 index 0000000..5365430 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/RestoreMode.java @@ -0,0 +1,12 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum RestoreMode { + @JsonProperty("savepoint") + SAVEPOINT, + @JsonProperty("last-state") + LAST_STATE, + @JsonProperty("none") + NONE +} diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java new file mode 100644 index 0000000..a995098 --- /dev/null +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java @@ -0,0 +1,14 @@ +package org.apache.flink.kubernetes.operator.crd.spec; + +import io.fabric8.kubernetes.api.model.Pod; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class TaskManagerSpec { + private int taskSlots; + private Resource resource; + private Pod podTemplate; +} + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java similarity index 81% rename from src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java rename to src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java index 4b27f1b..88b1209 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkApplicationStatus.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java @@ -5,6 +5,6 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor -public class FlinkApplicationStatus { +public class FlinkDeploymentStatus { private JobStatus[] jobStatuses; } diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index 97e583e..369b39a 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -5,4 +5,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n +appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] %msg%n%throwable}