Hey Till,

Thank you for responding.

I've already read the link you send , but they are not enough , they don't 
provide a good solution for production.

Standalone-Kubernetes is not a good approach for production for 3 main 
reasons(In my opinion):

  *   TMs are defined as deployment which means they stay up and running even 
after the job manager(k8s job) is completed
  *   Using these configurations , Flink does not aware of the standalone 
Kubernetes, which means you have to clean the resources yourself.
  *   IMHO, dealing with yaml files in production is a bad practice , Deploying 
application cluster via yaml files in runtime is not a good approach. (how 
exactly? terraform, Kubernetes client, kubectl?)

Native k8s is the right approach since it terminates all dynamic resources, 
however the documentation shows only a deployment via Flink CLI, which again 
not a good practice in production

Another solution is to use Kubernetes Operation(i.e 
https://github.com/wangyang0918/flink-native-k8s-operator) ,however, the 
operator expects CRDs, which defined by yaml file + triggering new Flink app is 
done by yaml files.

Unfortunately, the documentation doesn't elaborate much about the Flink Client.

After digging and debugging the operation lib above,  I found out that Flink 
client can deploy an application cluster.

Here is a simple Program that creates an application cluster with dynamic 
resources allocation (Using Flink Client & Flink Kubernetes client) in Java

Dependencies  (i.e flink.version=1.12.1, scala.binary.version=2.12)

<dependency>

   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_2.12</artifactId>
   <version>${flink.version}</version>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-kubernetes_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-core</artifactId>
   <version>${flink.version}</version>
</dependency>

Java Code:

@Log4j2
public class TestFlinkClientToAppCluster {

    public static void main(String[] args) throws InterruptedException {
        log.info("Client started");
        // Deploy application
        final ClusterClientServiceLoader clusterClientServiceLoader = new 
DefaultClusterClientServiceLoader();
        final ApplicationDeployer deployer = new 
ApplicationClusterDeployer(clusterClientServiceLoader);
        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(new String[]{},null);
        try {
            deployer.run(getEffectiveConfig(), applicationConfiguration);
            log.info("Finished");
        } catch (Exception e) {
            log.error("Failed to deploy cluster {}", "flink-test", e);
        }
    }

Where the getEffectiveConfig() returns a Configuration Object. represents a 
flink-conf.yaml file with all necessary parameters

For example:



Configuration effectiveConfig = new Configuration(); //you can load from file: 
GlobalConfiguration.loadConfiguration(<path>);
URI uri = new URI("local://path/to/artifcat");

effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, "default");

effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, "flink-test");

effectiveConfig.set(DeploymentOptions.TARGET, "kubernetes-application");
effectiveConfig.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE, 
KubernetesConfigOptions.ServiceExposedType.ClusterIP);
effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, 
"app-cluster-job-manager:1.0.1");
effectiveConfig.set(KubernetesConfigOptions.KUBERNETES_ENTRY_PATH, 
"/docker-entrypoint.sh");
effectiveConfig.set(PipelineOptions.JARS, 
Collections.singletonList(uri.toString()));

if specific configurations for TM or JM are needed, then you can configure them 
like that

effectiveConfig.setString(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
"1024m");
effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_CPU, 500.0);
effectiveConfig.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), 
"2048m");
effectiveConfig.set(KubernetesConfigOptions.TASK_MANAGER_CPU, 500.0);

If the application runs outside K8S it might throw an error indicating the pod 
failed to find the REST endpoint (it looks for k8s format 
<cluster-name>-rest.<namespace>) - but the cluster will be deployed anyway.

I deployed the application on k8s (docker-desktop)

  *   Make sure you provide the kube config . since Flink Kubernetes client is 
running within k8s it needs credentials to communicate with API server and 
create dynamic resources. - otherwise, the following exception will be raised: 
Java: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target

Docker file for the program above (config is the local Kubernetes credentials 
file located in ~/.kube/config) I copied it to the project dir.

FROM openjdk:11-jdk-slim-buster
COPY target/<jar-name>.jar /
RUN mkdir -p /opt/files
ADD log4j2.xml /opt/files/log4j.xml
ADD config  /opt/files/config
ENV KUBECONFIG=/opt/files/config
ENTRYPOINT ["java","-Dlog4j.configurationFile=/opt/files/log4j2.xml", "-jar", 
"/<jar-name>.jar"]

Hope that helps,

Tamir.





________________________________
From: Till Rohrmann <trohrm...@apache.org>
Sent: Thursday, March 11, 2021 3:40 PM
To: Tamir Sagi <tamir.s...@niceactimize.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Application cluster - Best Practice


EXTERNAL EMAIL


Hi Tamir,

1. How you start a cluster in application mode depends on how you deploy Flink. 
If you want to use Flink's standalone application mode, then you have to deploy 
a new cluster for every job you want to run [1, 2, 3]. What a standalone 
application cluster needs is the user code jar and a pointer which class to 
run. So if you want to deploy a standalone cluster on Docker or Kubernetes, 
then you either add the user code to your image, mount it as a volume or 
specify an init container (K8s case) to download the user code jar from 
somewhere.

If you want to deploy on K8s and are able to use Flink's native integration 
(meaning that Flink can talk with the K8s cluster), you can also use the client 
to submit a new application cluster [4]. But also here, you have to make the 
user code available to your container in some way.

2. If you deploy your application cluster in standalone mode, then Flink won't 
automatically terminate the TaskManagers. This only works when using one of 
Flink's active deployments (K8s or Yarn). Hence, you either have to wait for 
the registration timeout of the TaskManager or stop them explicitly.

3. No, TaskManagers don't have to stay alive. The fact that they stayed alive 
is caused by the fact that you deployed Flink in standalone mode where Flink 
cannot start and stop TaskManagers.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#application-mode-on-docker
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/#application-mode
[4] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#application-mode

Cheers,
Till

On Sat, Mar 6, 2021 at 6:16 PM Tamir Sagi 
<tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> wrote:
Hey All,

I'm running an application cluster(Flink 1.11.1) on top of Kubernetes(currently 
running locally using docker-desktop). deployed via terraform module. (2 task 
managers)

I followed the following instruction(well written)
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster

I have successfully executed the batch job. Kubernetes job(marked as completed)
[https://attachments.office.net/owa/Tamir.Sagi%40niceactimize.com/service.svc/s/GetAttachmentThumbnail?id=AAMkAGQxZDc3N2VmLTc2YjktNDBkYy1hYTlkLTlkZGJjOTVhNzgzYwBGAAAAAADTC80E8Z8%2BSZUI2Am6rB91BwBZ4%2BSIfRzgRZqm%2FnQvQJIRAAAAAAEPAABZ4%2BSIfRzgRZqm%2FnQvQJIRAAFRZ8rRAAABEgAQAKrfWdERFqxGgTUFYZGSyog%3D&thumbnailType=2&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IjMwODE3OUNFNUY0QjUyRTc4QjJEQjg5NjZCQUY0RUNDMzcyN0FFRUUiLCJ0eXAiOiJKV1QiLCJ4NXQiOiJNSUY1emw5TFV1ZUxMYmlXYTY5T3pEY25ydTQifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2sub2ZmaWNlLmNvbSIsInVjIjoiOTRmMDM0MDU3Y2UwNDliMjlkZDhmY2U2NzEwNjkyYjkiLCJzaWduaW5fc3RhdGUiOiJbXCJrbXNpXCJdIiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA3MTIzZGFiZC0wZTg3LTRkYTktOWNiOS1iN2VjODIwMTFhYWQiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjExNTM4MDExMTU5MTU4NzEyNzdcIixcInNjb3BlXCI6XCJPd2FEb3dubG9hZFwiLFwib2lkXCI6XCIzM2MwMWQyMS0yZGU1LTQyMDEtOWM3ZS1lYTY0OTYwZDFjMWVcIixcInByaW1hcnlzaWRcIjpcIlMtMS01LTIxLTE4MDA0OTE2MS0zOTk2NjA1NjY3LTQwMzc0MjAyNDEtMTgxNjQ3MDVcIn0iLCJuYmYiOjE2MTU0NzM5NzQsImV4cCI6MTYxNTQ3NDU3NCwiaXNzIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwQDcxMjNkYWJkLTBlODctNGRhOS05Y2I5LWI3ZWM4MjAxMWFhZCIsImF1ZCI6IjAwMDAwMDAyLTAwMDAtMGZmMS1jZTAwLTAwMDAwMDAwMDAwMC9hdHRhY2htZW50cy5vZmZpY2UubmV0QDcxMjNkYWJkLTBlODctNGRhOS05Y2I5LWI3ZWM4MjAxMWFhZCIsImhhcHAiOiJvd2EifQ.h9rzMpaYz3y9e045gWI1IRCqRG8YPJctzx07yyR4hJ0LSn0QHpbNTCk5qF8dBhKX2Pklpwy20JAKjMcKfD2jxNNchoSSyLF1yjyGQIWsSxAw9VjTIFGq-WbnyeCvtNvCZcV_wzuiVolgAPs25FtfJ4rhiC-HGyykO9Cf7Dqjv2dgsuWcYaBxrS65pKQTO8Odpxq06OsRiPZhd-g6uLaGOa-qA1nGlZB9CCi65SQvSOJ8dUVzkMxaqjv1wWhR0kfKVBSC7y_muKoCbnHqytQCnSyLQ_iRoC2jlrhxubwWv9Wdv4WdILOKY0dAGyV1sPrePoSEtUqIiCtZpshO8IcrEw&X-OWA-CANARY=Vi5AsAOeLEqB1-lQOtYn-0BY2H2c5NgYe_1MO5GKKRPfyqFvnc8TXnWeVdLIco8t0I7RZlfc-G8.&owa=outlook.office.com&scriptVer=20210308001.06&animation=true]

I was reading the following article
https://flink.apache.org/news/2020/07/14/application-mode.html
Apache Flink: Application Deployment in Flink: Current State and the new 
Application Mode<https://flink.apache.org/news/2020/07/14/application-mode.html>
Application Deployment in Flink: Current State and the new Application Mode. 14 
Jul 2020 Kostas Kloudas ()With the rise of stream processing and real-time 
analytics as a critical tool for modern businesses, an increasing number of 
organizations build platforms with Apache Flink at their core and offer it 
internally as a service.
flink.apache.org<http://flink.apache.org>


I have several questions

  1.  What is the best practice to trigger jobs on the fly? in other words, how 
to submit new jobs in runtime in application mode? (in session cluster I could 
submit job via Flink-client).

  2.  Once the job is completed, I get the following message inside Task manager
[2021-03-06T17:06:55,054][Info] {} [o.a.f.r.t.TaskExecutor]: Could not resolve 
ResourceManager address 
akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*,
 retrying in 10000 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@data-aggregation-flink-jobmanager:6123/user/rpc/resourcemanager_*.

it tries to register to resource manager until it's crashed with the error:
[2021-03-06T17:10:47,437][Error] {} [o.a.f.r.t.TaskManagerRunner]: Fatal error 
occurred while executing the TaskManager. Shutting it 
down...org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
 Could not register at the ResourceManager within the specified maximum 
registration duration 300000 ms. This indicates a problem with this instance. 
Terminating now.

Is that a normal behavior?


  3.  In application mode, Does Task manager have to stay alive after the job 
has been completed?


Thanks,
Tamir.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

[https://my-email-signature.link/signature.gif?u=1088647&e=140732224&v=e9a3a78b0298868f876546ebfadfb6ae8b20029d0089418386e00b768c80e522]

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Reply via email to