I agree with Gyula Fora, 

In our case, we have a client-machine in the middle between our YARN cluster 
and some backend services, which can not be reached directly from the cluster 
nodes. On application startup, we connect to some external systems, get some 
information crucial for the job runtime and finally build up the job graph to 
be committed. 

It is true that we could workaround this, but it would be pretty annoying to 
connect to the remote services, collect the data, upload it to HDFS, start the 
job and make sure, housekeeping of those files is also done at some later time. 

The current behavior also corresponds to the behavior of Sparks driver mode, 
which made the transition from Spark to Flink easier for us. 

But I see the point, especially in terms of Kubernetes and would thus also vote 
for an opt-in solution, being the client compilation the default and having an 
option for the per-program mode as well. 

Best regards 


Von: "Flavio Pompermaier" <pomperma...@okkam.it> 
An: "Yang Wang" <danrtsey...@gmail.com> 
CC: "tison" <wander4...@gmail.com>, "Newport, Billy" <billy.newp...@gs.com>, 
"Paul Lam" <paullin3...@gmail.com>, "SHI Xiaogang" <shixiaoga...@gmail.com>, 
"dev" <d...@flink.apache.org>, "user" <user@flink.apache.org> 
Gesendet: Donnerstag, 31. Oktober 2019 10:45:36 
Betreff: Re: [DISCUSS] Semantic and implementation of per-job mode 

Hi all, 
we're using a lot the multiple jobs in one program and this is why: when you 
fetch data from a huge number of sources and, for each source, you do some 
transformation and then you want to write into a single directory the union of 
all outputs (this assumes you're doing batch). When the number of sources is 
large, if you want to do this in a single job, the graph becomes very big and 
this is a problem for several reasons: 


    * too many substasks /threadsi per slot 
    * increase of back pressure 
    * if a single "sub-job" fails all the job fails..this is very annoying if 
this happens after a half a day for example 
    * In our use case, the big-graph mode takes much longer than running each 
job separately (but maybe this is true only if you don't have much hardware 
resources) 
    * debugging the cause of a fail could become a daunting task if the job 
graph is too large 
    * we faced may strange errors when trying to run the single big-job mode 
(due to serialization corruption) 

So, summarizing our overall experience with Flink batch is: the easier is the 
job graph the better! 

Best, 
Flavio 


On Thu, Oct 31, 2019 at 10:14 AM Yang Wang < [ mailto:danrtsey...@gmail.com | 
danrtsey...@gmail.com ] > wrote: 



Thanks for tison starting this exciting discussion. We also suffer a lot from 
the per job mode. 
I think the per-job cluster is a dedicated cluster for only one job and will 
not accept more other 
jobs. It has the advantage of one-step submission, do not need to start 
dispatcher first and 
then submit the job. And it does not matter where the job graph is generated 
and job is submitted. 
Now we have two cases. 

(1) Current Yarn detached cluster. The job graph is generated in client and 
then use distributed 
cache to flink master container. And the MiniDispatcher uses 
`FileJobGraphRetrieve` to get it. 
The job will be submitted at flink master side. 


(2) Standalone per job cluster. User jars are already built into image. So the 
job graph will be 
generated at flink master side and `ClasspathJobGraphRetriver` is used to get 
it. The job will 
also be submitted at flink master side. 


For the (1) and (2), only one job in user program could be supported. The per 
job means 
per job-graph, so it works just as expected. 


Tison suggests to add a new mode "per-program”. The user jar will be 
transferred to flink master 
container, and a local client will be started to generate job graph and submit 
job. I think it could 
cover all the functionality of current per job, both (1) and (2). Also the 
detach mode and attach 
mode could be unified. We do not need to start a session cluster to simulate 
per job for multiple parts. 


I am in favor of the “per-program” mode. Just two concerns. 
1. How many users are using multiple jobs in one program? 
2. Why do not always use session cluster to simulate per job? Maybe one-step 
submission 
is a convincing reason. 


Best, 
Yang 

tison < [ mailto:wander4...@gmail.com | wander4...@gmail.com ] > 于2019年10月31日周四 
上午9:18写道: 

BQ_BEGIN

Thanks for your attentions! 

[ mailto:shixiaoga...@gmail.com | @shixiaoga...@gmail.com ] 

Yes correct. We try to avoid jobs affect one another. Also a local 
ClusterClient 
in case saves the overhead about retry before leader elected and persist 
JobGraph before submission in RestClusterClient as well as the net cost. 

[ mailto:paullin3...@gmail.com | @Paul Lam ] 

1. Here is already a note[1] about multiple part jobs. I am also confused a bit 
on this concept at first :-) Things go in similar way if you program contains 
the 
only JobGraph so that I think per-program acts like per-job-graph in this case 
which provides compatibility for many of one job graph program. 

Besides, we have to respect user program which doesn't with current 
implementation because we return abruptly when calling env#execute which 
hijack user control so that they cannot deal with the job result or the future 
of 
it. I think this is why we have to add a detach/attach option. 

2. For compilation part, I think it could be a workaround that you upload those 
resources in a commonly known address such as HDFS so that compilation 
can read from either client or cluster. 

Best, 
tison. 

[1] [ 
https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
 | 
https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
 ] 


Newport, Billy < [ mailto:billy.newp...@gs.com | billy.newp...@gs.com ] > 
于2019年10月30日周三 下午10:41写道: 

BQ_BEGIN



We execute multiple job graphs routinely because we cannot submit a single 
graph without it blowing up. I believe Regina spoke to this in Berlin during 
her talk. We instead if we are processing a database ingestion with 200 tables 
in it, we do a job graph per table rather than a single job graph that does all 
tables instead. A single job graph can be in the tens of thousands of nodes in 
our largest cases and we have found flink (as os 1.3/1.6.4) cannot handle 
graphs of that size. We’re currently testing 1.9.1 but have not retested the 
large graph scenario. 



Billy 






From: Paul Lam [mailto: [ mailto:paullin3...@gmail.com | paullin3...@gmail.com 
] ] 
Sent: Wednesday, October 30, 2019 8:41 AM 
To: SHI Xiaogang 
Cc: tison; dev; user 
Subject: Re: [DISCUSS] Semantic and implementation of per-job mode 




Hi, 





Thanks for starting the discussion. 





WRT the per-job semantic, it looks natural to me that per-job means 
per-job-graph, 


because in my understanding JobGraph is the representation of a job. Could you 


share some use case in which a user program should contain multiple job graphs? 





WRT the per-program mode, I’m also in flavor of a unified cluster-side 
execution 


for user program, so +1 from my side. 





But I think there may be some values for the current per-job mode: we now have 


some common resources available on the client machine that would be read by 
main 


methods in user programs. If migrated to per-program mode, we must explicitly 


set the specific resources for each user program and ship them to the cluster, 


it would be a bit inconvenient. Also, as the job graph is compiled at the 
client, 


we can recognize the errors caused by user code before starting the cluster 


and easily get access to the logs. 





Best, 


Paul Lam 






在 2019 年 10 月 30 日, 16:22 , SHI Xiaogang < [ mailto:shixiaoga...@gmail.com | 
shixiaoga...@gmail.com ] > 写道: 





Hi 





Thanks for bringing this. 





The design looks very nice to me in that 


1. In the new per-job mode, we don't need to compile user programs in the 
client and can directly run user programs with user jars. That way, it's easier 
for resource isolation in multi-tenant platforms and is much safer. 


2. The execution of user programs can be unified in session and per-job modes. 
In session mode, user jobs are submitted via a remote ClusterClient while in 
per-job mode user jobs are submitted via a local ClusterClient. 





Regards, 


Xiaogang 





tison < [ mailto:wander4...@gmail.com | wander4...@gmail.com ] > 于 2019 年 10 月 
30 日周三 下午 3:30 写道: 

BQ_BEGIN



(CC user list because I think users may have ideas on how per-job mode should 
look like) 





Hi all, 

In the discussion about Flink on k8s[1] we encounter a problem that opinions 
diverge in how so-called per-job mode works. This thread is aimed at stating 
a dedicated discussion about per-job semantic and how to implement it. 

**The AS IS per-job mode** 

* in standalone deployment, we bundle user jar with Flink jar, retrieve 
JobGraph which is the very first JobGraph from user program in classpath, 
and then start a Dispatcher with this JobGraph preconfigured, which 
launches it as "recovered" job. 

* in YARN deployment, we accept submission via CliFrontend, extract JobGraph 
which is the very first JobGraph from user program submitted, serialize 
the JobGraph and upload it to YARN as resource, and then when AM starts, 
retrieve the JobGraph as resource and start Dispatcher with this JobGraph 
preconfigured, follows are the same. 

Specifically, in order to support multiple parts job, if YARN deployment 
configured as "attached", it starts a SessionCluster, proceeds the progress 
and shutdown the cluster on job finished. 

**Motivation** 

The implementation mentioned above, however, suffers from problems. The major 
two of them are 1. only respect the very first JobGraph from user program 2. 
compile job in client side 

1. Only respect the very first JobGraph from user program 

There is already issue about this topic[2]. As we extract JobGraph from user 
program by hijacking Environment#execute we actually abort any execution 
after the first call to #execute. Besides it surprises users many times that 
any logic they write in the program is possibly never executed, here the 
problem is that the semantic of "job" from Flink perspective. I'd like to say 
in current implementation "per-job" is actually "per-job-graph". However, 
in practices since we support jar submission it is "per-program" semantic 
wanted. 

2. Compile job in client side 

Well, standalone deployment is not in the case. But in YARN deployment, we 
compile job and get JobGraph in client side, and then upload it to YARN. 
This approach, however, somehow breaks isolation. We have observed that user 
program contains exception handling logic which call System.exit in main 
method, which causes a compilation of the job exit the whole client at once. 
It is a critical problem if we manage multiple Flink job in a unique platform. 
In this case, it shut down the whole service. 

Besides there are many times I was asked why per-job mode doesn't run 
"just like" session mode but with a dedicated cluster. It might imply that 
current implementation mismatches users' demand. 

**Proposal** 

In order to provide a "per-program" semantic mode which acts "just like" 
session 
mode but with a dedicated cluster, I propose a workflow as below. It acts like 
starting a drive on cluster but is not a general driver solution as proposed 
here[3], the main purpose of the workflow below is for providing a 
"per-program" 
semantic mode. 

*From CliFrontend* 

1. CliFrontend receives submission, gathers all configuration and starts a 
corresponding ClusterDescriptor. 


2. ClusterDescriptor deploys a cluster with main class ProgramClusterEntrypoint 
while shipping resources including user program. 


3. ProgramClusterEntrypoint#main contains logic starting components including 
Standalone Dispatcher, configuring user program to start a RpcClusterClient, 
and then invoking main method of user program. 


4. RpcClusterClient acts like MiniClusterClient which is able to submit the 
JobGraph after leader elected so that we don't fallback to round-robin or 
fail submission due to no leader. 


5. Whether or not deliver job result depends on user program logic, since we 
can already get a JobClient from execute. ProgramClusterEntrypoint exits on 
user program exits and all jobs submitted globally terminate. 

This way fits in the direction of FLIP-73 because strategy starting a 
RpcClusterClient can be regarded as a special Executor. After 
ProgramClusterEntrypoint#main starts a Cluster, it generates and passes 
configuration to 
user program so that when Executor generated, it knows to use a 
RpcClusterClient 
for submission and the address of Dispatcher. 

**Compatibility** 

In my opinion this mode can be totally an add-on to current codebase. We 
actually don't replace current per-job mode with so-called "per-program" mode. 
It happens that current per-job mode would be useless if we have such 
"per-program" mode so that we possibly deprecate it for preferring the other. 

I'm glad to discuss more into details if you're interested in, but let's say 
we'd better first reach a consensus on the overall design :-) 

Looking forward to your reply! 

Best, 
tison. 

[1] [ 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=
 | 
https://issues.apache.org/jira/browse/FLINK-9953 ] 
[2] [ 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=
 | 
https://issues.apache.org/jira/browse/FLINK-10879 ] 
[3] [ 
https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=
 | 
https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
 ] 









Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
[ http://www.gs.com/privacy-notices | 
www.gs.com/privacy-notices ] 

BQ_END


BQ_END


BQ_END


Reply via email to