Thanks Stephan, I will follow up this issue in next few weeks, and will refine the design doc. We could discuss more details after 1.9 release.
Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道: > Hi all! > > This thread has stalled for a bit, which I assume ist mostly due to the > Flink 1.9 feature freeze and release testing effort. > > I personally still recognize this issue as one important to be solved. I'd > be happy to help resume this discussion soon (after the 1.9 release) and > see if we can do some step towards this in Flink 1.10. > > Best, > Stephan > > > > On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > > That's exactly what I suggested a long time ago: the Flink REST client > > should not require any Flink dependency, only http library to call the > REST > > services to submit and monitor a job. > > What I suggested also in [1] was to have a way to automatically suggest > the > > user (via a UI) the available main classes and their required > > parameters[2]. > > Another problem we have with Flink is that the Rest client and the CLI > one > > behaves differently and we use the CLI client (via ssh) because it allows > > to call some other method after env.execute() [3] (we have to call > another > > REST service to signal the end of the job). > > Int his regard, a dedicated interface, like the JobListener suggested in > > the previous emails, would be very helpful (IMHO). > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > [2] https://issues.apache.org/jira/browse/FLINK-10862 > > [3] https://issues.apache.org/jira/browse/FLINK-10879 > > > > Best, > > Flavio > > > > On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <zjf...@gmail.com> wrote: > > > > > Hi, Tison, > > > > > > Thanks for your comments. Overall I agree with you that it is difficult > > for > > > down stream project to integrate with flink and we need to refactor the > > > current flink client api. > > > And I agree that CliFrontend should only parsing command line arguments > > and > > > then pass them to ExecutionEnvironment. It is ExecutionEnvironment's > > > responsibility to compile job, create cluster, and submit job. Besides > > > that, Currently flink has many ExecutionEnvironment implementations, > and > > > flink will use the specific one based on the context. IMHO, it is not > > > necessary, ExecutionEnvironment should be able to do the right thing > > based > > > on the FlinkConf it is received. Too many ExecutionEnvironment > > > implementation is another burden for downstream project integration. > > > > > > One thing I'd like to mention is flink's scala shell and sql client, > > > although they are sub-modules of flink, they could be treated as > > downstream > > > project which use flink's client api. Currently you will find it is not > > > easy for them to integrate with flink, they share many duplicated code. > > It > > > is another sign that we should refactor flink client api. > > > > > > I believe it is a large and hard change, and I am afraid we can not > keep > > > compatibility since many of changes are user facing. > > > > > > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月24日周一 下午2:53写道: > > > > > > > Hi all, > > > > > > > > After a closer look on our client apis, I can see there are two major > > > > issues to consistency and integration, namely different deployment of > > > > job cluster which couples job graph creation and cluster deployment, > > > > and submission via CliFrontend confusing control flow of job graph > > > > compilation and job submission. I'd like to follow the discuss above, > > > > mainly the process described by Jeff and Stephan, and share my > > > > ideas on these issues. > > > > > > > > 1) CliFrontend confuses the control flow of job compilation and > > > submission. > > > > Following the process of job submission Stephan and Jeff described, > > > > execution environment knows all configs of the cluster and > > topos/settings > > > > of the job. Ideally, in the main method of user program, it calls > > > #execute > > > > (or named #submit) and Flink deploys the cluster, compile the job > graph > > > > and submit it to the cluster. However, current CliFrontend does all > > these > > > > things inside its #runProgram method, which introduces a lot of > > > subclasses > > > > of (stream) execution environment. > > > > > > > > Actually, it sets up an exec env that hijacks the > #execute/executePlan > > > > method, initializes the job graph and abort execution. And then > > > > control flow back to CliFrontend, it deploys the cluster(or retrieve > > > > the client) and submits the job graph. This is quite a specific > > internal > > > > process inside Flink and none of consistency to anything. > > > > > > > > 2) Deployment of job cluster couples job graph creation and cluster > > > > deployment. Abstractly, from user job to a concrete submission, it > > > requires > > > > > > > > create JobGraph --\ > > > > > > > > create ClusterClient --> submit JobGraph > > > > > > > > such a dependency. ClusterClient was created by deploying or > > retrieving. > > > > JobGraph submission requires a compiled JobGraph and valid > > ClusterClient, > > > > but the creation of ClusterClient is abstractly independent of that > of > > > > JobGraph. However, in job cluster mode, we deploy job cluster with a > > job > > > > graph, which means we use another process: > > > > > > > > create JobGraph --> deploy cluster with the JobGraph > > > > > > > > Here is another inconsistency and downstream projects/client apis are > > > > forced to handle different cases with rare supports from Flink. > > > > > > > > Since we likely reached a consensus on > > > > > > > > 1. all configs gathered by Flink configuration and passed > > > > 2. execution environment knows all configs and handles execution(both > > > > deployment and submission) > > > > > > > > to the issues above I propose eliminating inconsistencies by > following > > > > approach: > > > > > > > > 1) CliFrontend should exactly be a front end, at least for "run" > > command. > > > > That means it just gathered and passed all config from command line > to > > > > the main method of user program. Execution environment knows all the > > info > > > > and with an addition to utils for ClusterClient, we gracefully get a > > > > ClusterClient by deploying or retrieving. In this way, we don't need > to > > > > hijack #execute/executePlan methods and can remove various hacking > > > > subclasses of exec env, as well as #run methods in ClusterClient(for > an > > > > interface-ized ClusterClient). Now the control flow flows from > > > CliFrontend > > > > to the main method and never returns. > > > > > > > > 2) Job cluster means a cluster for the specific job. From another > > > > perspective, it is an ephemeral session. We may decouple the > deployment > > > > with a compiled job graph, but start a session with idle timeout > > > > and submit the job following. > > > > > > > > These topics, before we go into more details on design or > > implementation, > > > > are better to be aware and discussed for a consensus. > > > > > > > > Best, > > > > tison. > > > > > > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月20日周四 上午3:21写道: > > > > > > > >> Hi Jeff, > > > >> > > > >> Thanks for raising this thread and the design document! > > > >> > > > >> As @Thomas Weise mentioned above, extending config to flink > > > >> requires far more effort than it should be. Another example > > > >> is we achieve detach mode by introduce another execution > > > >> environment which also hijack #execute method. > > > >> > > > >> I agree with your idea that user would configure all things > > > >> and flink "just" respect it. On this topic I think the unusual > > > >> control flow when CliFrontend handle "run" command is the problem. > > > >> It handles several configs, mainly about cluster settings, and > > > >> thus main method of user program is unaware of them. Also it > compiles > > > >> app to job graph by run the main method with a hijacked exec env, > > > >> which constrain the main method further. > > > >> > > > >> I'd like to write down a few of notes on configs/args pass and > > respect, > > > >> as well as decoupling job compilation and submission. Share on this > > > >> thread later. > > > >> > > > >> Best, > > > >> tison. > > > >> > > > >> > > > >> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一 下午7:29写道: > > > >> > > > >>> Hi Jeff and Flavio, > > > >>> > > > >>> Thanks Jeff a lot for proposing the design document. > > > >>> > > > >>> We are also working on refactoring ClusterClient to allow flexible > > and > > > >>> efficient job management in our real-time platform. > > > >>> We would like to draft a document to share our ideas with you. > > > >>> > > > >>> I think it's a good idea to have something like Apache Livy for > > Flink, > > > >>> and > > > >>> the efforts discussed here will take a great step forward to it. > > > >>> > > > >>> Regards, > > > >>> Xiaogang > > > >>> > > > >>> Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一 下午7:13写道: > > > >>> > > > >>> > Is there any possibility to have something like Apache Livy [1] > > also > > > >>> for > > > >>> > Flink in the future? > > > >>> > > > > >>> > [1] https://livy.apache.org/ > > > >>> > > > > >>> > On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com> > > wrote: > > > >>> > > > > >>> > > >>> Any API we expose should not have dependencies on the > > runtime > > > >>> > > (flink-runtime) package or other implementation details. To me, > > > this > > > >>> > means > > > >>> > > that the current ClusterClient cannot be exposed to users > because > > > it > > > >>> > uses > > > >>> > > quite some classes from the optimiser and runtime packages. > > > >>> > > > > > >>> > > We should change ClusterClient from class to interface. > > > >>> > > ExecutionEnvironment only use the interface ClusterClient which > > > >>> should be > > > >>> > > in flink-clients while the concrete implementation class could > be > > > in > > > >>> > > flink-runtime. > > > >>> > > > > > >>> > > >>> What happens when a failure/restart in the client happens? > > > There > > > >>> need > > > >>> > > to be a way of re-establishing the connection to the job, set > up > > > the > > > >>> > > listeners again, etc. > > > >>> > > > > > >>> > > Good point. First we need to define what does failure/restart > in > > > the > > > >>> > > client mean. IIUC, that usually mean network failure which will > > > >>> happen in > > > >>> > > class RestClient. If my understanding is correct, restart/retry > > > >>> mechanism > > > >>> > > should be done in RestClient. > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 > 下午11:10写道: > > > >>> > > > > > >>> > > > Some points to consider: > > > >>> > > > > > > >>> > > > * Any API we expose should not have dependencies on the > runtime > > > >>> > > > (flink-runtime) package or other implementation details. To > me, > > > >>> this > > > >>> > > means > > > >>> > > > that the current ClusterClient cannot be exposed to users > > because > > > >>> it > > > >>> > > uses > > > >>> > > > quite some classes from the optimiser and runtime packages. > > > >>> > > > > > > >>> > > > * What happens when a failure/restart in the client happens? > > > There > > > >>> need > > > >>> > > to > > > >>> > > > be a way of re-establishing the connection to the job, set up > > the > > > >>> > > listeners > > > >>> > > > again, etc. > > > >>> > > > > > > >>> > > > Aljoscha > > > >>> > > > > > > >>> > > > > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> > > > wrote: > > > >>> > > > > > > > >>> > > > > Sorry folks, the design doc is late as you expected. Here's > > the > > > >>> > design > > > >>> > > > doc > > > >>> > > > > I drafted, welcome any comments and feedback. > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道: > > > >>> > > > > > > > >>> > > > >> Nice that this discussion is happening. > > > >>> > > > >> > > > >>> > > > >> In the FLIP, we could also revisit the entire role of the > > > >>> > environments > > > >>> > > > >> again. > > > >>> > > > >> > > > >>> > > > >> Initially, the idea was: > > > >>> > > > >> - the environments take care of the specific setup for > > > >>> standalone > > > >>> > (no > > > >>> > > > >> setup needed), yarn, mesos, etc. > > > >>> > > > >> - the session ones have control over the session. The > > > >>> environment > > > >>> > > holds > > > >>> > > > >> the session client. > > > >>> > > > >> - running a job gives a "control" object for that job. > That > > > >>> > behavior > > > >>> > > is > > > >>> > > > >> the same in all environments. > > > >>> > > > >> > > > >>> > > > >> The actual implementation diverged quite a bit from that. > > > Happy > > > >>> to > > > >>> > > see a > > > >>> > > > >> discussion about straitening this out a bit more. > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang < > > zjf...@gmail.com> > > > >>> > wrote: > > > >>> > > > >> > > > >>> > > > >>> Hi folks, > > > >>> > > > >>> > > > >>> > > > >>> Sorry for late response, It seems we reach consensus on > > > this, I > > > >>> > will > > > >>> > > > >> create > > > >>> > > > >>> FLIP for this with more detailed design > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道: > > > >>> > > > >>> > > > >>> > > > >>>> Great to see this discussion seeded! The problems you > face > > > >>> with > > > >>> > the > > > >>> > > > >>>> Zeppelin integration are also affecting other downstream > > > >>> projects, > > > >>> > > > like > > > >>> > > > >>>> Beam. > > > >>> > > > >>>> > > > >>> > > > >>>> We just enabled the savepoint restore option in > > > >>> > > > RemoteStreamEnvironment > > > >>> > > > >>> [1] > > > >>> > > > >>>> and that was more difficult than it should be. The main > > > issue > > > >>> is > > > >>> > > that > > > >>> > > > >>>> environment and cluster client aren't decoupled. Ideally > > it > > > >>> should > > > >>> > > be > > > >>> > > > >>>> possible to just get the matching cluster client from > the > > > >>> > > environment > > > >>> > > > >> and > > > >>> > > > >>>> then control the job through it (environment as factory > > for > > > >>> > cluster > > > >>> > > > >>>> client). But note that the environment classes are part > of > > > the > > > >>> > > public > > > >>> > > > >>> API, > > > >>> > > > >>>> and it is not straightforward to make larger changes > > without > > > >>> > > breaking > > > >>> > > > >>>> backward compatibility. > > > >>> > > > >>>> > > > >>> > > > >>>> ClusterClient currently exposes internal classes like > > > >>> JobGraph and > > > >>> > > > >>>> StreamGraph. But it should be possible to wrap this > with a > > > new > > > >>> > > public > > > >>> > > > >> API > > > >>> > > > >>>> that brings the required job control capabilities for > > > >>> downstream > > > >>> > > > >>> projects. > > > >>> > > > >>>> Perhaps it is helpful to look at some of the interfaces > in > > > >>> Beam > > > >>> > > while > > > >>> > > > >>>> thinking about this: [2] for the portable job API and > [3] > > > for > > > >>> the > > > >>> > > old > > > >>> > > > >>>> asynchronous job control from the Beam Java SDK. > > > >>> > > > >>>> > > > >>> > > > >>>> The backward compatibility discussion [4] is also > relevant > > > >>> here. A > > > >>> > > new > > > >>> > > > >>> API > > > >>> > > > >>>> should shield downstream projects from internals and > allow > > > >>> them to > > > >>> > > > >>>> interoperate with multiple future Flink versions in the > > same > > > >>> > release > > > >>> > > > >> line > > > >>> > > > >>>> without forced upgrades. > > > >>> > > > >>>> > > > >>> > > > >>>> Thanks, > > > >>> > > > >>>> Thomas > > > >>> > > > >>>> > > > >>> > > > >>>> [1] https://github.com/apache/flink/pull/7249 > > > >>> > > > >>>> [2] > > > >>> > > > >>>> > > > >>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >> > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto > > > >>> > > > >>>> [3] > > > >>> > > > >>>> > > > >>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >> > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java > > > >>> > > > >>>> [4] > > > >>> > > > >>>> > > > >>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >> > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E > > > >>> > > > >>>> > > > >>> > > > >>>> > > > >>> > > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang < > > > zjf...@gmail.com> > > > >>> > > wrote: > > > >>> > > > >>>> > > > >>> > > > >>>>>>>> I'm not so sure whether the user should be able to > > > define > > > >>> > where > > > >>> > > > >> the > > > >>> > > > >>>> job > > > >>> > > > >>>>> runs (in your example Yarn). This is actually > independent > > > of > > > >>> the > > > >>> > > job > > > >>> > > > >>>>> development and is something which is decided at > > deployment > > > >>> time. > > > >>> > > > >>>>> > > > >>> > > > >>>>> User don't need to specify execution mode > > programmatically. > > > >>> They > > > >>> > > can > > > >>> > > > >>> also > > > >>> > > > >>>>> pass the execution mode from the arguments in flink run > > > >>> command. > > > >>> > > e.g. > > > >>> > > > >>>>> > > > >>> > > > >>>>> bin/flink run -m yarn-cluster .... > > > >>> > > > >>>>> bin/flink run -m local ... > > > >>> > > > >>>>> bin/flink run -m host:port ... > > > >>> > > > >>>>> > > > >>> > > > >>>>> Does this make sense to you ? > > > >>> > > > >>>>> > > > >>> > > > >>>>>>>> To me it makes sense that the ExecutionEnvironment > is > > > not > > > >>> > > > >> directly > > > >>> > > > >>>>> initialized by the user and instead context sensitive > how > > > you > > > >>> > want > > > >>> > > to > > > >>> > > > >>>>> execute your job (Flink CLI vs. IDE, for example). > > > >>> > > > >>>>> > > > >>> > > > >>>>> Right, currently I notice Flink would create different > > > >>> > > > >>>>> ContextExecutionEnvironment based on different > submission > > > >>> > scenarios > > > >>> > > > >>>> (Flink > > > >>> > > > >>>>> Cli vs IDE). To me this is kind of hack approach, not > so > > > >>> > > > >>> straightforward. > > > >>> > > > >>>>> What I suggested above is that is that flink should > > always > > > >>> create > > > >>> > > the > > > >>> > > > >>>> same > > > >>> > > > >>>>> ExecutionEnvironment but with different configuration, > > and > > > >>> based > > > >>> > on > > > >>> > > > >> the > > > >>> > > > >>>>> configuration it would create the proper ClusterClient > > for > > > >>> > > different > > > >>> > > > >>>>> behaviors. > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 > > > >>> 下午11:18写道: > > > >>> > > > >>>>> > > > >>> > > > >>>>>> You are probably right that we have code duplication > > when > > > it > > > >>> > comes > > > >>> > > > >> to > > > >>> > > > >>>> the > > > >>> > > > >>>>>> creation of the ClusterClient. This should be reduced > in > > > the > > > >>> > > > >> future. > > > >>> > > > >>>>>> > > > >>> > > > >>>>>> I'm not so sure whether the user should be able to > > define > > > >>> where > > > >>> > > the > > > >>> > > > >>> job > > > >>> > > > >>>>>> runs (in your example Yarn). This is actually > > independent > > > >>> of the > > > >>> > > > >> job > > > >>> > > > >>>>>> development and is something which is decided at > > > deployment > > > >>> > time. > > > >>> > > > >> To > > > >>> > > > >>> me > > > >>> > > > >>>>> it > > > >>> > > > >>>>>> makes sense that the ExecutionEnvironment is not > > directly > > > >>> > > > >> initialized > > > >>> > > > >>>> by > > > >>> > > > >>>>>> the user and instead context sensitive how you want to > > > >>> execute > > > >>> > > your > > > >>> > > > >>> job > > > >>> > > > >>>>>> (Flink CLI vs. IDE, for example). However, I agree > that > > > the > > > >>> > > > >>>>>> ExecutionEnvironment should give you access to the > > > >>> ClusterClient > > > >>> > > > >> and > > > >>> > > > >>> to > > > >>> > > > >>>>> the > > > >>> > > > >>>>>> job (maybe in the form of the JobGraph or a job plan). > > > >>> > > > >>>>>> > > > >>> > > > >>>>>> Cheers, > > > >>> > > > >>>>>> Till > > > >>> > > > >>>>>> > > > >>> > > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang < > > > >>> zjf...@gmail.com> > > > >>> > > > >> wrote: > > > >>> > > > >>>>>> > > > >>> > > > >>>>>>> Hi Till, > > > >>> > > > >>>>>>> Thanks for the feedback. You are right that I expect > > > better > > > >>> > > > >>>>> programmatic > > > >>> > > > >>>>>>> job submission/control api which could be used by > > > >>> downstream > > > >>> > > > >>> project. > > > >>> > > > >>>>> And > > > >>> > > > >>>>>>> it would benefit for the flink ecosystem. When I look > > at > > > >>> the > > > >>> > code > > > >>> > > > >>> of > > > >>> > > > >>>>>> flink > > > >>> > > > >>>>>>> scala-shell and sql-client (I believe they are not > the > > > >>> core of > > > >>> > > > >>> flink, > > > >>> > > > >>>>> but > > > >>> > > > >>>>>>> belong to the ecosystem of flink), I find many > > duplicated > > > >>> code > > > >>> > > > >> for > > > >>> > > > >>>>>> creating > > > >>> > > > >>>>>>> ClusterClient from user provided configuration > > > >>> (configuration > > > >>> > > > >>> format > > > >>> > > > >>>>> may > > > >>> > > > >>>>>> be > > > >>> > > > >>>>>>> different from scala-shell and sql-client) and then > use > > > >>> that > > > >>> > > > >>>>>> ClusterClient > > > >>> > > > >>>>>>> to manipulate jobs. I don't think this is convenient > > for > > > >>> > > > >> downstream > > > >>> > > > >>>>>>> projects. What I expect is that downstream project > only > > > >>> needs > > > >>> > to > > > >>> > > > >>>>> provide > > > >>> > > > >>>>>>> necessary configuration info (maybe introducing class > > > >>> > FlinkConf), > > > >>> > > > >>> and > > > >>> > > > >>>>>> then > > > >>> > > > >>>>>>> build ExecutionEnvironment based on this FlinkConf, > and > > > >>> > > > >>>>>>> ExecutionEnvironment will create the proper > > > ClusterClient. > > > >>> It > > > >>> > not > > > >>> > > > >>>> only > > > >>> > > > >>>>>>> benefit for the downstream project development but > also > > > be > > > >>> > > > >> helpful > > > >>> > > > >>>> for > > > >>> > > > >>>>>>> their integration test with flink. Here's one sample > > code > > > >>> > snippet > > > >>> > > > >>>> that > > > >>> > > > >>>>> I > > > >>> > > > >>>>>>> expect. > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> val conf = new FlinkConf().mode("yarn") > > > >>> > > > >>>>>>> val env = new ExecutionEnvironment(conf) > > > >>> > > > >>>>>>> val jobId = env.submit(...) > > > >>> > > > >>>>>>> val jobStatus = > > > >>> env.getClusterClient().queryJobStatus(jobId) > > > >>> > > > >>>>>>> env.getClusterClient().cancelJob(jobId) > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> What do you think ? > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 > > > >>> 下午6:28写道: > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>>> Hi Jeff, > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>> what you are proposing is to provide the user with > > > better > > > >>> > > > >>>>> programmatic > > > >>> > > > >>>>>>> job > > > >>> > > > >>>>>>>> control. There was actually an effort to achieve > this > > > but > > > >>> it > > > >>> > > > >> has > > > >>> > > > >>>>> never > > > >>> > > > >>>>>>> been > > > >>> > > > >>>>>>>> completed [1]. However, there are some improvement > in > > > the > > > >>> code > > > >>> > > > >>> base > > > >>> > > > >>>>>> now. > > > >>> > > > >>>>>>>> Look for example at the NewClusterClient interface > > which > > > >>> > > > >> offers a > > > >>> > > > >>>>>>>> non-blocking job submission. But I agree that we > need > > to > > > >>> > > > >> improve > > > >>> > > > >>>>> Flink > > > >>> > > > >>>>>> in > > > >>> > > > >>>>>>>> this regard. > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>> I would not be in favour if exposing all > ClusterClient > > > >>> calls > > > >>> > > > >> via > > > >>> > > > >>>> the > > > >>> > > > >>>>>>>> ExecutionEnvironment because it would clutter the > > class > > > >>> and > > > >>> > > > >> would > > > >>> > > > >>>> not > > > >>> > > > >>>>>> be > > > >>> > > > >>>>>>> a > > > >>> > > > >>>>>>>> good separation of concerns. Instead one idea could > be > > > to > > > >>> > > > >>> retrieve > > > >>> > > > >>>>> the > > > >>> > > > >>>>>>>> current ClusterClient from the ExecutionEnvironment > > > which > > > >>> can > > > >>> > > > >>> then > > > >>> > > > >>>> be > > > >>> > > > >>>>>>> used > > > >>> > > > >>>>>>>> for cluster and job control. But before we start an > > > effort > > > >>> > > > >> here, > > > >>> > > > >>> we > > > >>> > > > >>>>>> need > > > >>> > > > >>>>>>> to > > > >>> > > > >>>>>>>> agree and capture what functionality we want to > > provide. > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>> Initially, the idea was that we have the > > > ClusterDescriptor > > > >>> > > > >>>> describing > > > >>> > > > >>>>>> how > > > >>> > > > >>>>>>>> to talk to cluster manager like Yarn or Mesos. The > > > >>> > > > >>>> ClusterDescriptor > > > >>> > > > >>>>>> can > > > >>> > > > >>>>>>> be > > > >>> > > > >>>>>>>> used for deploying Flink clusters (job and session) > > and > > > >>> gives > > > >>> > > > >>> you a > > > >>> > > > >>>>>>>> ClusterClient. The ClusterClient controls the > cluster > > > >>> (e.g. > > > >>> > > > >>>>> submitting > > > >>> > > > >>>>>>>> jobs, listing all running jobs). And then there was > > the > > > >>> idea > > > >>> > to > > > >>> > > > >>>>>>> introduce a > > > >>> > > > >>>>>>>> JobClient which you obtain from the ClusterClient to > > > >>> trigger > > > >>> > > > >> job > > > >>> > > > >>>>>> specific > > > >>> > > > >>>>>>>> operations (e.g. taking a savepoint, cancelling the > > > job). > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>> [1] > https://issues.apache.org/jira/browse/FLINK-4272 > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>> Cheers, > > > >>> > > > >>>>>>>> Till > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang < > > > >>> zjf...@gmail.com > > > >>> > > > > > >>> > > > >>>>> wrote: > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>>>> Hi Folks, > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> I am trying to integrate flink into apache zeppelin > > > >>> which is > > > >>> > > > >> an > > > >>> > > > >>>>>>>> interactive > > > >>> > > > >>>>>>>>> notebook. And I hit several issues that is caused > by > > > >>> flink > > > >>> > > > >>> client > > > >>> > > > >>>>>> api. > > > >>> > > > >>>>>>> So > > > >>> > > > >>>>>>>>> I'd like to proposal the following changes for > flink > > > >>> client > > > >>> > > > >>> api. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 1. Support nonblocking execution. Currently, > > > >>> > > > >>>>>>> ExecutionEnvironment#execute > > > >>> > > > >>>>>>>>> is a blocking method which would do 2 things, first > > > >>> submit > > > >>> > > > >> job > > > >>> > > > >>>> and > > > >>> > > > >>>>>> then > > > >>> > > > >>>>>>>>> wait for job until it is finished. I'd like > > introduce a > > > >>> > > > >>>> nonblocking > > > >>> > > > >>>>>>>>> execution method like ExecutionEnvironment#submit > > which > > > >>> only > > > >>> > > > >>>> submit > > > >>> > > > >>>>>> job > > > >>> > > > >>>>>>>> and > > > >>> > > > >>>>>>>>> then return jobId to client. And allow user to > query > > > the > > > >>> job > > > >>> > > > >>>> status > > > >>> > > > >>>>>> via > > > >>> > > > >>>>>>>> the > > > >>> > > > >>>>>>>>> jobId. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 2. Add cancel api in > > > >>> > > > >>>>> ExecutionEnvironment/StreamExecutionEnvironment, > > > >>> > > > >>>>>>>>> currently the only way to cancel job is via cli > > > >>> (bin/flink), > > > >>> > > > >>> this > > > >>> > > > >>>>> is > > > >>> > > > >>>>>>> not > > > >>> > > > >>>>>>>>> convenient for downstream project to use this > > feature. > > > >>> So I'd > > > >>> > > > >>>> like > > > >>> > > > >>>>> to > > > >>> > > > >>>>>>> add > > > >>> > > > >>>>>>>>> cancel api in ExecutionEnvironment > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 3. Add savepoint api in > > > >>> > > > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. > > > >>> > > > >>>>>>>> It > > > >>> > > > >>>>>>>>> is similar as cancel api, we should use > > > >>> ExecutionEnvironment > > > >>> > > > >> as > > > >>> > > > >>>> the > > > >>> > > > >>>>>>>> unified > > > >>> > > > >>>>>>>>> api for third party to integrate with flink. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 4. Add listener for job execution lifecycle. > > Something > > > >>> like > > > >>> > > > >>>>>> following, > > > >>> > > > >>>>>>> so > > > >>> > > > >>>>>>>>> that downstream project can do custom logic in the > > > >>> lifecycle > > > >>> > > > >> of > > > >>> > > > >>>>> job. > > > >>> > > > >>>>>>> e.g. > > > >>> > > > >>>>>>>>> Zeppelin would capture the jobId after job is > > submitted > > > >>> and > > > >>> > > > >>> then > > > >>> > > > >>>>> use > > > >>> > > > >>>>>>> this > > > >>> > > > >>>>>>>>> jobId to cancel it later when necessary. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> public interface JobListener { > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> void onJobSubmitted(JobID jobId); > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> void onJobExecuted(JobExecutionResult jobResult); > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> void onJobCanceled(JobID jobId); > > > >>> > > > >>>>>>>>> } > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 5. Enable session in ExecutionEnvironment. > Currently > > it > > > >>> is > > > >>> > > > >>>>> disabled, > > > >>> > > > >>>>>>> but > > > >>> > > > >>>>>>>>> session is very convenient for third party to > > > submitting > > > >>> jobs > > > >>> > > > >>>>>>>> continually. > > > >>> > > > >>>>>>>>> I hope flink can enable it again. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 6. Unify all flink client api into > > > >>> > > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> This is a long term issue which needs more careful > > > >>> thinking > > > >>> > > > >> and > > > >>> > > > >>>>>> design. > > > >>> > > > >>>>>>>>> Currently some of features of flink is exposed in > > > >>> > > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, > but > > > >>> some are > > > >>> > > > >>>>> exposed > > > >>> > > > >>>>>>> in > > > >>> > > > >>>>>>>>> cli instead of api, like the cancel and savepoint I > > > >>> mentioned > > > >>> > > > >>>>> above. > > > >>> > > > >>>>>> I > > > >>> > > > >>>>>>>>> think the root cause is due to that flink didn't > > unify > > > >>> the > > > >>> > > > >>>>>> interaction > > > >>> > > > >>>>>>>> with > > > >>> > > > >>>>>>>>> flink. Here I list 3 scenarios of flink operation > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> - Local job execution. Flink will create > > > >>> LocalEnvironment > > > >>> > > > >>> and > > > >>> > > > >>>>>> then > > > >>> > > > >>>>>>>> use > > > >>> > > > >>>>>>>>> this LocalEnvironment to create LocalExecutor for > > job > > > >>> > > > >>>> execution. > > > >>> > > > >>>>>>>>> - Remote job execution. Flink will create > > > ClusterClient > > > >>> > > > >>> first > > > >>> > > > >>>>> and > > > >>> > > > >>>>>>> then > > > >>> > > > >>>>>>>>> create ContextEnvironment based on the > > ClusterClient > > > >>> and > > > >>> > > > >>> then > > > >>> > > > >>>>> run > > > >>> > > > >>>>>>> the > > > >>> > > > >>>>>>>>> job. > > > >>> > > > >>>>>>>>> - Job cancelation. Flink will create > ClusterClient > > > >>> first > > > >>> > > > >> and > > > >>> > > > >>>>> then > > > >>> > > > >>>>>>>> cancel > > > >>> > > > >>>>>>>>> this job via this ClusterClient. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> As you can see in the above 3 scenarios. Flink > didn't > > > >>> use the > > > >>> > > > >>>> same > > > >>> > > > >>>>>>>>> approach(code path) to interact with flink > > > >>> > > > >>>>>>>>> What I propose is following: > > > >>> > > > >>>>>>>>> Create the proper > LocalEnvironment/RemoteEnvironment > > > >>> (based > > > >>> > > > >> on > > > >>> > > > >>>> user > > > >>> > > > >>>>>>>>> configuration) --> Use this Environment to create > > > proper > > > >>> > > > >>>>>> ClusterClient > > > >>> > > > >>>>>>>>> (LocalClusterClient or RestClusterClient) to > > > interactive > > > >>> with > > > >>> > > > >>>>> Flink ( > > > >>> > > > >>>>>>> job > > > >>> > > > >>>>>>>>> execution or cancelation) > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> This way we can unify the process of local > execution > > > and > > > >>> > > > >> remote > > > >>> > > > >>>>>>>> execution. > > > >>> > > > >>>>>>>>> And it is much easier for third party to integrate > > with > > > >>> > > > >> flink, > > > >>> > > > >>>>>> because > > > >>> > > > >>>>>>>>> ExecutionEnvironment is the unified entry point for > > > >>> flink. > > > >>> > > > >> What > > > >>> > > > >>>>> third > > > >>> > > > >>>>>>>> party > > > >>> > > > >>>>>>>>> needs to do is just pass configuration to > > > >>> > > > >> ExecutionEnvironment > > > >>> > > > >>>> and > > > >>> > > > >>>>>>>>> ExecutionEnvironment will do the right thing based > on > > > the > > > >>> > > > >>>>>>> configuration. > > > >>> > > > >>>>>>>>> Flink cli can also be considered as flink api > > consumer. > > > >>> it > > > >>> > > > >> just > > > >>> > > > >>>>> pass > > > >>> > > > >>>>>>> the > > > >>> > > > >>>>>>>>> configuration to ExecutionEnvironment and let > > > >>> > > > >>>> ExecutionEnvironment > > > >>> > > > >>>>> to > > > >>> > > > >>>>>>>>> create the proper ClusterClient instead of letting > > cli > > > to > > > >>> > > > >>> create > > > >>> > > > >>>>>>>>> ClusterClient directly. > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> 6 would involve large code refactoring, so I think > we > > > can > > > >>> > > > >> defer > > > >>> > > > >>>> it > > > >>> > > > >>>>>> for > > > >>> > > > >>>>>>>>> future release, 1,2,3,4,5 could be done at once I > > > >>> believe. > > > >>> > > > >> Let > > > >>> > > > >>> me > > > >>> > > > >>>>>> know > > > >>> > > > >>>>>>>> your > > > >>> > > > >>>>>>>>> comments and feedback, thanks > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> -- > > > >>> > > > >>>>>>>>> Best Regards > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>>> Jeff Zhang > > > >>> > > > >>>>>>>>> > > > >>> > > > >>>>>>>> > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> -- > > > >>> > > > >>>>>>> Best Regards > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>>> Jeff Zhang > > > >>> > > > >>>>>>> > > > >>> > > > >>>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> > > > >>> > > > >>>>> -- > > > >>> > > > >>>>> Best Regards > > > >>> > > > >>>>> > > > >>> > > > >>>>> Jeff Zhang > > > >>> > > > >>>>> > > > >>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> > > > >>> Best Regards > > > >>> > > > >>> > > > >>> > > > >>> Jeff Zhang > > > >>> > > > >>> > > > >>> > > > >> > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > -- > > > >>> > > > > Best Regards > > > >>> > > > > > > > >>> > > > > Jeff Zhang > > > >>> > > > > > > >>> > > > > > > >>> > > > > > >>> > > -- > > > >>> > > Best Regards > > > >>> > > > > > >>> > > Jeff Zhang > > > >>> > > > > > >>> > > > > >>> > > > >> > > > > > > -- > > > Best Regards > > > > > > Jeff Zhang > > > > > > -- Best Regards Jeff Zhang