Hi all,

Regarding the compilation part:

I think there are up and downsides to building the Flink job (running the
main method) on the client side, however since this is the current way of
doing it we should have a very powerful reason to change the default
behaviour.
While there is a possible workaround for reading local resources on the
client (upload to hdfs) if this is a common pattern then it makes no sense
to force users to this workaround.

If we want to introduce server-side building of the jobs we should provide
both options for the users to choose from in my opinion and client side
should probably stay the default.

Cheers,
Gyula



On Thu, Oct 31, 2019 at 2:18 AM tison <wander4...@gmail.com> wrote:

> Thanks for your attentions!
>
> @shixiaoga...@gmail.com <shixiaoga...@gmail.com>
>
> Yes correct. We try to avoid jobs affect one another. Also a local
> ClusterClient
> in case saves the overhead about retry before leader elected and persist
> JobGraph before submission in RestClusterClient as well as the net cost.
>
> @Paul Lam <paullin3...@gmail.com>
>
> 1. Here is already a note[1] about multiple part jobs. I am also confused
> a bit
> on this concept at first :-) Things go in similar way if you program
> contains the
> only JobGraph so that I think per-program acts like per-job-graph in this
> case
> which provides compatibility for many of one job graph program.
>
> Besides, we have to respect user program which doesn't with current
> implementation because we return abruptly when calling env#execute which
> hijack user control so that they cannot deal with the job result or the
> future of
> it. I think this is why we have to add a detach/attach option.
>
> 2. For compilation part, I think it could be a workaround that you upload
> those
> resources in a commonly known address such as HDFS so that compilation
> can read from either client or cluster.
>
> Best,
> tison.
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
>
>
> Newport, Billy <billy.newp...@gs.com> 于2019年10月30日周三 下午10:41写道:
>
>> We execute multiple job graphs routinely because we cannot submit a
>> single graph without it blowing up. I believe Regina spoke to this in
>> Berlin during her talk. We instead if we are processing a database
>> ingestion with 200 tables in it, we do a job graph per table rather than a
>> single job graph that does all tables instead. A single job graph can be in
>> the tens of thousands of nodes in our largest cases and we have found flink
>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently
>> testing 1.9.1 but have not retested the large graph scenario.
>>
>>
>>
>> Billy
>>
>>
>>
>>
>>
>> *From:* Paul Lam [mailto:paullin3...@gmail.com]
>> *Sent:* Wednesday, October 30, 2019 8:41 AM
>> *To:* SHI Xiaogang
>> *Cc:* tison; dev; user
>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>
>>
>>
>> Hi,
>>
>>
>>
>> Thanks for starting the discussion.
>>
>>
>>
>> WRT the per-job semantic, it looks natural to me that per-job means
>> per-job-graph,
>>
>> because in my understanding JobGraph is the representation of a job.
>> Could you
>>
>> share some use case in which a user program should contain multiple job
>> graphs?
>>
>>
>>
>> WRT the per-program mode, I’m also in flavor of a unified cluster-side
>> execution
>>
>> for user program, so +1 from my side.
>>
>>
>>
>> But I think there may be some values for the current per-job mode: we now
>> have
>>
>> some common resources available on the client machine that would be read
>> by main
>>
>> methods in user programs. If migrated to per-program mode, we must
>> explicitly
>>
>> set the specific resources for each user program and ship them to the
>> cluster,
>>
>> it would be a bit inconvenient.  Also, as the job graph is compiled at
>> the client,
>>
>> we can recognize the errors caused by user code before starting the
>> cluster
>>
>> and easily get access to the logs.
>>
>>
>>
>> Best,
>>
>> Paul Lam
>>
>>
>>
>> 在 2019年10月30日,16:22,SHI Xiaogang <shixiaoga...@gmail.com> 写道:
>>
>>
>>
>> Hi
>>
>>
>>
>> Thanks for bringing this.
>>
>>
>>
>> The design looks very nice to me in that
>>
>> 1. In the new per-job mode, we don't need to compile user programs in the
>> client and can directly run user programs with user jars. That way, it's
>> easier for resource isolation in multi-tenant platforms and is much safer.
>>
>> 2. The execution of user programs can be unified in session and per-job
>> modes. In session mode, user jobs are submitted via a remote ClusterClient
>> while in per-job mode user jobs are submitted via a local ClusterClient.
>>
>>
>>
>> Regards,
>>
>> Xiaogang
>>
>>
>>
>> tison <wander4...@gmail.com> 于2019年10月30日周三 下午3:30写道:
>>
>> (CC user list because I think users may have ideas on how per-job mode
>> should look like)
>>
>>
>>
>> Hi all,
>>
>> In the discussion about Flink on k8s[1] we encounter a problem that
>> opinions
>> diverge in how so-called per-job mode works. This thread is aimed at
>> stating
>> a dedicated discussion about per-job semantic and how to implement it.
>>
>> **The AS IS per-job mode**
>>
>> * in standalone deployment, we bundle user jar with Flink jar, retrieve
>> JobGraph which is the very first JobGraph from user program in classpath,
>> and then start a Dispatcher with this JobGraph preconfigured, which
>> launches it as "recovered" job.
>>
>> * in YARN deployment, we accept submission via CliFrontend, extract
>> JobGraph
>> which is the very first JobGraph from user program submitted, serialize
>> the JobGraph and upload it to YARN as resource, and then when AM starts,
>> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
>> preconfigured, follows are the same.
>>
>> Specifically, in order to support multiple parts job, if YARN deployment
>> configured as "attached", it starts a SessionCluster, proceeds the
>> progress
>> and shutdown the cluster on job finished.
>>
>> **Motivation**
>>
>> The implementation mentioned above, however, suffers from problems. The
>> major
>> two of them are 1. only respect the very first JobGraph from user program
>> 2.
>> compile job in client side
>>
>> 1. Only respect the very first JobGraph from user program
>>
>> There is already issue about this topic[2]. As we extract JobGraph from
>> user
>> program by hijacking Environment#execute we actually abort any execution
>> after the first call to #execute. Besides it surprises users many times
>> that
>> any logic they write in the program is possibly never executed, here the
>> problem is that the semantic of "job" from Flink perspective. I'd like to
>> say
>> in current implementation "per-job" is actually "per-job-graph". However,
>> in practices since we support jar submission it is "per-program" semantic
>> wanted.
>>
>> 2. Compile job in client side
>>
>> Well, standalone deployment is not in the case. But in YARN deployment, we
>> compile job and get JobGraph in client side, and then upload it to YARN.
>> This approach, however, somehow breaks isolation. We have observed that
>> user
>> program contains exception handling logic which call System.exit in main
>> method, which causes a compilation of the job exit the whole client at
>> once.
>> It is a critical problem if we manage multiple Flink job in a unique
>> platform.
>> In this case, it shut down the whole service.
>>
>> Besides there are many times I was asked why per-job mode doesn't run
>> "just like" session mode but with a dedicated cluster. It might imply that
>> current implementation mismatches users' demand.
>>
>> **Proposal**
>>
>> In order to provide a "per-program" semantic mode which acts "just like"
>> session
>> mode but with a dedicated cluster, I propose a workflow as below. It acts
>> like
>> starting a drive on cluster but is not a general driver solution as
>> proposed
>> here[3], the main purpose of the workflow below is for providing a
>> "per-program"
>> semantic mode.
>>
>> *From CliFrontend*
>>
>> 1. CliFrontend receives submission, gathers all configuration and starts a
>> corresponding ClusterDescriptor.
>>
>> 2. ClusterDescriptor deploys a cluster with main class
>> ProgramClusterEntrypoint
>> while shipping resources including user program.
>>
>> 3. ProgramClusterEntrypoint#main contains logic starting components
>> including
>> Standalone Dispatcher, configuring user program to start a
>> RpcClusterClient,
>> and then invoking main method of user program.
>>
>> 4. RpcClusterClient acts like MiniClusterClient which is able to submit
>> the
>> JobGraph after leader elected so that we don't fallback to round-robin or
>> fail submission due to no leader.
>>
>> 5. Whether or not deliver job result depends on user program logic, since
>> we
>> can already get a JobClient from execute. ProgramClusterEntrypoint exits
>> on
>> user program exits and all jobs submitted globally terminate.
>>
>> This way fits in the direction of FLIP-73 because strategy starting a
>> RpcClusterClient can be regarded as a special Executor. After
>> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
>> configuration to
>> user program so that when Executor generated, it knows to use a
>> RpcClusterClient
>> for submission and the address of Dispatcher.
>>
>> **Compatibility**
>>
>> In my opinion this mode can be totally an add-on to current codebase. We
>> actually don't replace current per-job mode with so-called "per-program"
>> mode.
>> It happens that current per-job mode would be useless if we have such
>> "per-program" mode so that we possibly deprecate it for preferring the
>> other.
>>
>> I'm glad to discuss more into details if you're interested in, but let's
>> say
>> we'd better first reach a consensus on the overall design :-)
>>
>> Looking forward to your reply!
>>
>> Best,
>> tison.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9953
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=>
>> [2] https://issues.apache.org/jira/browse/FLINK-10879
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=>
>> [3]
>> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=>
>>
>>
>>
>> ------------------------------
>>
>> Your Personal Data: We may collect and process information about you that
>> may be subject to data protection laws. For more information about how we
>> use and disclose your personal data, how we protect your information, our
>> legal basis to use your information, your rights and who you can contact,
>> please refer to: www.gs.com/privacy-notices
>>
>

Reply via email to