(CC user list because I think users may have ideas on how per-job mode
should look like)

Hi all,

In the discussion about Flink on k8s[1] we encounter a problem that opinions
diverge in how so-called per-job mode works. This thread is aimed at stating
a dedicated discussion about per-job semantic and how to implement it.

**The AS IS per-job mode**

* in standalone deployment, we bundle user jar with Flink jar, retrieve
JobGraph which is the very first JobGraph from user program in classpath,
and then start a Dispatcher with this JobGraph preconfigured, which
launches it as "recovered" job.

* in YARN deployment, we accept submission via CliFrontend, extract JobGraph
which is the very first JobGraph from user program submitted, serialize
the JobGraph and upload it to YARN as resource, and then when AM starts,
retrieve the JobGraph as resource and start Dispatcher with this JobGraph
preconfigured, follows are the same.

Specifically, in order to support multiple parts job, if YARN deployment
configured as "attached", it starts a SessionCluster, proceeds the progress
and shutdown the cluster on job finished.

**Motivation**

The implementation mentioned above, however, suffers from problems. The
major
two of them are 1. only respect the very first JobGraph from user program 2.
compile job in client side

1. Only respect the very first JobGraph from user program

There is already issue about this topic[2]. As we extract JobGraph from user
program by hijacking Environment#execute we actually abort any execution
after the first call to #execute. Besides it surprises users many times that
any logic they write in the program is possibly never executed, here the
problem is that the semantic of "job" from Flink perspective. I'd like to
say
in current implementation "per-job" is actually "per-job-graph". However,
in practices since we support jar submission it is "per-program" semantic
wanted.

2. Compile job in client side

Well, standalone deployment is not in the case. But in YARN deployment, we
compile job and get JobGraph in client side, and then upload it to YARN.
This approach, however, somehow breaks isolation. We have observed that user
program contains exception handling logic which call System.exit in main
method, which causes a compilation of the job exit the whole client at once.
It is a critical problem if we manage multiple Flink job in a unique
platform.
In this case, it shut down the whole service.

Besides there are many times I was asked why per-job mode doesn't run
"just like" session mode but with a dedicated cluster. It might imply that
current implementation mismatches users' demand.

**Proposal**

In order to provide a "per-program" semantic mode which acts "just like"
session
mode but with a dedicated cluster, I propose a workflow as below. It acts
like
starting a drive on cluster but is not a general driver solution as proposed
here[3], the main purpose of the workflow below is for providing a
"per-program"
semantic mode.

*From CliFrontend*

1. CliFrontend receives submission, gathers all configuration and starts a
corresponding ClusterDescriptor.

2. ClusterDescriptor deploys a cluster with main class
ProgramClusterEntrypoint
while shipping resources including user program.

3. ProgramClusterEntrypoint#main contains logic starting components
including
Standalone Dispatcher, configuring user program to start a RpcClusterClient,
and then invoking main method of user program.

4. RpcClusterClient acts like MiniClusterClient which is able to submit the
JobGraph after leader elected so that we don't fallback to round-robin or
fail submission due to no leader.

5. Whether or not deliver job result depends on user program logic, since we
can already get a JobClient from execute. ProgramClusterEntrypoint exits on
user program exits and all jobs submitted globally terminate.

This way fits in the direction of FLIP-73 because strategy starting a
RpcClusterClient can be regarded as a special Executor. After
ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
configuration to
user program so that when Executor generated, it knows to use a
RpcClusterClient
for submission and the address of Dispatcher.

**Compatibility**

In my opinion this mode can be totally an add-on to current codebase. We
actually don't replace current per-job mode with so-called "per-program"
mode.
It happens that current per-job mode would be useless if we have such
"per-program" mode so that we possibly deprecate it for preferring the
other.

I'm glad to discuss more into details if you're interested in, but let's say
we'd better first reach a consensus on the overall design :-)

Looking forward to your reply!

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-9953
[2] https://issues.apache.org/jira/browse/FLINK-10879
[3]
https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#

Reply via email to