Thanks Stephan, I will follow up this issue in next few weeks, and will
refine the design doc. We could discuss more details after 1.9 release.

Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道:

> Hi all!
>
> This thread has stalled for a bit, which I assume ist mostly due to the
> Flink 1.9 feature freeze and release testing effort.
>
> I personally still recognize this issue as one important to be solved. I'd
> be happy to help resume this discussion soon (after the 1.9 release) and
> see if we can do some step towards this in Flink 1.10.
>
> Best,
> Stephan
>
>
>
> On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
> > That's exactly what I suggested a long time ago: the Flink REST client
> > should not require any Flink dependency, only http library to call the
> REST
> > services to submit and monitor a job.
> > What I suggested also in [1] was to have a way to automatically suggest
> the
> > user (via a UI) the available main classes and their required
> > parameters[2].
> > Another problem we have with Flink is that the Rest client and the CLI
> one
> > behaves differently and we use the CLI client (via ssh) because it allows
> > to call some other method after env.execute() [3] (we have to call
> another
> > REST service to signal the end of the job).
> > Int his regard, a dedicated interface, like the JobListener suggested in
> > the previous emails, would be very helpful (IMHO).
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10864
> > [2] https://issues.apache.org/jira/browse/FLINK-10862
> > [3] https://issues.apache.org/jira/browse/FLINK-10879
> >
> > Best,
> > Flavio
> >
> > On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <zjf...@gmail.com> wrote:
> >
> > > Hi, Tison,
> > >
> > > Thanks for your comments. Overall I agree with you that it is difficult
> > for
> > > down stream project to integrate with flink and we need to refactor the
> > > current flink client api.
> > > And I agree that CliFrontend should only parsing command line arguments
> > and
> > > then pass them to ExecutionEnvironment. It is ExecutionEnvironment's
> > > responsibility to compile job, create cluster, and submit job. Besides
> > > that, Currently flink has many ExecutionEnvironment implementations,
> and
> > > flink will use the specific one based on the context. IMHO, it is not
> > > necessary, ExecutionEnvironment should be able to do the right thing
> > based
> > > on the FlinkConf it is received. Too many ExecutionEnvironment
> > > implementation is another burden for downstream project integration.
> > >
> > > One thing I'd like to mention is flink's scala shell and sql client,
> > > although they are sub-modules of flink, they could be treated as
> > downstream
> > > project which use flink's client api. Currently you will find it is not
> > > easy for them to integrate with flink, they share many duplicated code.
> > It
> > > is another sign that we should refactor flink client api.
> > >
> > > I believe it is a large and hard change, and I am afraid we can not
> keep
> > > compatibility since many of changes are user facing.
> > >
> > >
> > >
> > > Zili Chen <wander4...@gmail.com> 于2019年6月24日周一 下午2:53写道:
> > >
> > > > Hi all,
> > > >
> > > > After a closer look on our client apis, I can see there are two major
> > > > issues to consistency and integration, namely different deployment of
> > > > job cluster which couples job graph creation and cluster deployment,
> > > > and submission via CliFrontend confusing control flow of job graph
> > > > compilation and job submission. I'd like to follow the discuss above,
> > > > mainly the process described by Jeff and Stephan, and share my
> > > > ideas on these issues.
> > > >
> > > > 1) CliFrontend confuses the control flow of job compilation and
> > > submission.
> > > > Following the process of job submission Stephan and Jeff described,
> > > > execution environment knows all configs of the cluster and
> > topos/settings
> > > > of the job. Ideally, in the main method of user program, it calls
> > > #execute
> > > > (or named #submit) and Flink deploys the cluster, compile the job
> graph
> > > > and submit it to the cluster. However, current CliFrontend does all
> > these
> > > > things inside its #runProgram method, which introduces a lot of
> > > subclasses
> > > > of (stream) execution environment.
> > > >
> > > > Actually, it sets up an exec env that hijacks the
> #execute/executePlan
> > > > method, initializes the job graph and abort execution. And then
> > > > control flow back to CliFrontend, it deploys the cluster(or retrieve
> > > > the client) and submits the job graph. This is quite a specific
> > internal
> > > > process inside Flink and none of consistency to anything.
> > > >
> > > > 2) Deployment of job cluster couples job graph creation and cluster
> > > > deployment. Abstractly, from user job to a concrete submission, it
> > > requires
> > > >
> > > >      create JobGraph --\
> > > >
> > > > create ClusterClient -->  submit JobGraph
> > > >
> > > > such a dependency. ClusterClient was created by deploying or
> > retrieving.
> > > > JobGraph submission requires a compiled JobGraph and valid
> > ClusterClient,
> > > > but the creation of ClusterClient is abstractly independent of that
> of
> > > > JobGraph. However, in job cluster mode, we deploy job cluster with a
> > job
> > > > graph, which means we use another process:
> > > >
> > > > create JobGraph --> deploy cluster with the JobGraph
> > > >
> > > > Here is another inconsistency and downstream projects/client apis are
> > > > forced to handle different cases with rare supports from Flink.
> > > >
> > > > Since we likely reached a consensus on
> > > >
> > > > 1. all configs gathered by Flink configuration and passed
> > > > 2. execution environment knows all configs and handles execution(both
> > > > deployment and submission)
> > > >
> > > > to the issues above I propose eliminating inconsistencies by
> following
> > > > approach:
> > > >
> > > > 1) CliFrontend should exactly be a front end, at least for "run"
> > command.
> > > > That means it just gathered and passed all config from command line
> to
> > > > the main method of user program. Execution environment knows all the
> > info
> > > > and with an addition to utils for ClusterClient, we gracefully get a
> > > > ClusterClient by deploying or retrieving. In this way, we don't need
> to
> > > > hijack #execute/executePlan methods and can remove various hacking
> > > > subclasses of exec env, as well as #run methods in ClusterClient(for
> an
> > > > interface-ized ClusterClient). Now the control flow flows from
> > > CliFrontend
> > > > to the main method and never returns.
> > > >
> > > > 2) Job cluster means a cluster for the specific job. From another
> > > > perspective, it is an ephemeral session. We may decouple the
> deployment
> > > > with a compiled job graph, but start a session with idle timeout
> > > > and submit the job following.
> > > >
> > > > These topics, before we go into more details on design or
> > implementation,
> > > > are better to be aware and discussed for a consensus.
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Zili Chen <wander4...@gmail.com> 于2019年6月20日周四 上午3:21写道:
> > > >
> > > >> Hi Jeff,
> > > >>
> > > >> Thanks for raising this thread and the design document!
> > > >>
> > > >> As @Thomas Weise mentioned above, extending config to flink
> > > >> requires far more effort than it should be. Another example
> > > >> is we achieve detach mode by introduce another execution
> > > >> environment which also hijack #execute method.
> > > >>
> > > >> I agree with your idea that user would configure all things
> > > >> and flink "just" respect it. On this topic I think the unusual
> > > >> control flow when CliFrontend handle "run" command is the problem.
> > > >> It handles several configs, mainly about cluster settings, and
> > > >> thus main method of user program is unaware of them. Also it
> compiles
> > > >> app to job graph by run the main method with a hijacked exec env,
> > > >> which constrain the main method further.
> > > >>
> > > >> I'd like to write down a few of notes on configs/args pass and
> > respect,
> > > >> as well as decoupling job compilation and submission. Share on this
> > > >> thread later.
> > > >>
> > > >> Best,
> > > >> tison.
> > > >>
> > > >>
> > > >> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一 下午7:29写道:
> > > >>
> > > >>> Hi Jeff and Flavio,
> > > >>>
> > > >>> Thanks Jeff a lot for proposing the design document.
> > > >>>
> > > >>> We are also working on refactoring ClusterClient to allow flexible
> > and
> > > >>> efficient job management in our real-time platform.
> > > >>> We would like to draft a document to share our ideas with you.
> > > >>>
> > > >>> I think it's a good idea to have something like Apache Livy for
> > Flink,
> > > >>> and
> > > >>> the efforts discussed here will take a great step forward to it.
> > > >>>
> > > >>> Regards,
> > > >>> Xiaogang
> > > >>>
> > > >>> Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一 下午7:13写道:
> > > >>>
> > > >>> > Is there any possibility to have something like Apache Livy [1]
> > also
> > > >>> for
> > > >>> > Flink in the future?
> > > >>> >
> > > >>> > [1] https://livy.apache.org/
> > > >>> >
> > > >>> > On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com>
> > wrote:
> > > >>> >
> > > >>> > > >>>  Any API we expose should not have dependencies on the
> > runtime
> > > >>> > > (flink-runtime) package or other implementation details. To me,
> > > this
> > > >>> > means
> > > >>> > > that the current ClusterClient cannot be exposed to users
> because
> > > it
> > > >>> >  uses
> > > >>> > > quite some classes from the optimiser and runtime packages.
> > > >>> > >
> > > >>> > > We should change ClusterClient from class to interface.
> > > >>> > > ExecutionEnvironment only use the interface ClusterClient which
> > > >>> should be
> > > >>> > > in flink-clients while the concrete implementation class could
> be
> > > in
> > > >>> > > flink-runtime.
> > > >>> > >
> > > >>> > > >>> What happens when a failure/restart in the client happens?
> > > There
> > > >>> need
> > > >>> > > to be a way of re-establishing the connection to the job, set
> up
> > > the
> > > >>> > > listeners again, etc.
> > > >>> > >
> > > >>> > > Good point.  First we need to define what does failure/restart
> in
> > > the
> > > >>> > > client mean. IIUC, that usually mean network failure which will
> > > >>> happen in
> > > >>> > > class RestClient. If my understanding is correct, restart/retry
> > > >>> mechanism
> > > >>> > > should be done in RestClient.
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二
> 下午11:10写道:
> > > >>> > >
> > > >>> > > > Some points to consider:
> > > >>> > > >
> > > >>> > > > * Any API we expose should not have dependencies on the
> runtime
> > > >>> > > > (flink-runtime) package or other implementation details. To
> me,
> > > >>> this
> > > >>> > > means
> > > >>> > > > that the current ClusterClient cannot be exposed to users
> > because
> > > >>> it
> > > >>> > >  uses
> > > >>> > > > quite some classes from the optimiser and runtime packages.
> > > >>> > > >
> > > >>> > > > * What happens when a failure/restart in the client happens?
> > > There
> > > >>> need
> > > >>> > > to
> > > >>> > > > be a way of re-establishing the connection to the job, set up
> > the
> > > >>> > > listeners
> > > >>> > > > again, etc.
> > > >>> > > >
> > > >>> > > > Aljoscha
> > > >>> > > >
> > > >>> > > > > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com>
> > > wrote:
> > > >>> > > > >
> > > >>> > > > > Sorry folks, the design doc is late as you expected. Here's
> > the
> > > >>> > design
> > > >>> > > > doc
> > > >>> > > > > I drafted, welcome any comments and feedback.
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道:
> > > >>> > > > >
> > > >>> > > > >> Nice that this discussion is happening.
> > > >>> > > > >>
> > > >>> > > > >> In the FLIP, we could also revisit the entire role of the
> > > >>> > environments
> > > >>> > > > >> again.
> > > >>> > > > >>
> > > >>> > > > >> Initially, the idea was:
> > > >>> > > > >>  - the environments take care of the specific setup for
> > > >>> standalone
> > > >>> > (no
> > > >>> > > > >> setup needed), yarn, mesos, etc.
> > > >>> > > > >>  - the session ones have control over the session. The
> > > >>> environment
> > > >>> > > holds
> > > >>> > > > >> the session client.
> > > >>> > > > >>  - running a job gives a "control" object for that job.
> That
> > > >>> > behavior
> > > >>> > > is
> > > >>> > > > >> the same in all environments.
> > > >>> > > > >>
> > > >>> > > > >> The actual implementation diverged quite a bit from that.
> > > Happy
> > > >>> to
> > > >>> > > see a
> > > >>> > > > >> discussion about straitening this out a bit more.
> > > >>> > > > >>
> > > >>> > > > >>
> > > >>> > > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <
> > zjf...@gmail.com>
> > > >>> > wrote:
> > > >>> > > > >>
> > > >>> > > > >>> Hi folks,
> > > >>> > > > >>>
> > > >>> > > > >>> Sorry for late response, It seems we reach consensus on
> > > this, I
> > > >>> > will
> > > >>> > > > >> create
> > > >>> > > > >>> FLIP for this with more detailed design
> > > >>> > > > >>>
> > > >>> > > > >>>
> > > >>> > > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
> > > >>> > > > >>>
> > > >>> > > > >>>> Great to see this discussion seeded! The problems you
> face
> > > >>> with
> > > >>> > the
> > > >>> > > > >>>> Zeppelin integration are also affecting other downstream
> > > >>> projects,
> > > >>> > > > like
> > > >>> > > > >>>> Beam.
> > > >>> > > > >>>>
> > > >>> > > > >>>> We just enabled the savepoint restore option in
> > > >>> > > > RemoteStreamEnvironment
> > > >>> > > > >>> [1]
> > > >>> > > > >>>> and that was more difficult than it should be. The main
> > > issue
> > > >>> is
> > > >>> > > that
> > > >>> > > > >>>> environment and cluster client aren't decoupled. Ideally
> > it
> > > >>> should
> > > >>> > > be
> > > >>> > > > >>>> possible to just get the matching cluster client from
> the
> > > >>> > > environment
> > > >>> > > > >> and
> > > >>> > > > >>>> then control the job through it (environment as factory
> > for
> > > >>> > cluster
> > > >>> > > > >>>> client). But note that the environment classes are part
> of
> > > the
> > > >>> > > public
> > > >>> > > > >>> API,
> > > >>> > > > >>>> and it is not straightforward to make larger changes
> > without
> > > >>> > > breaking
> > > >>> > > > >>>> backward compatibility.
> > > >>> > > > >>>>
> > > >>> > > > >>>> ClusterClient currently exposes internal classes like
> > > >>> JobGraph and
> > > >>> > > > >>>> StreamGraph. But it should be possible to wrap this
> with a
> > > new
> > > >>> > > public
> > > >>> > > > >> API
> > > >>> > > > >>>> that brings the required job control capabilities for
> > > >>> downstream
> > > >>> > > > >>> projects.
> > > >>> > > > >>>> Perhaps it is helpful to look at some of the interfaces
> in
> > > >>> Beam
> > > >>> > > while
> > > >>> > > > >>>> thinking about this: [2] for the portable job API and
> [3]
> > > for
> > > >>> the
> > > >>> > > old
> > > >>> > > > >>>> asynchronous job control from the Beam Java SDK.
> > > >>> > > > >>>>
> > > >>> > > > >>>> The backward compatibility discussion [4] is also
> relevant
> > > >>> here. A
> > > >>> > > new
> > > >>> > > > >>> API
> > > >>> > > > >>>> should shield downstream projects from internals and
> allow
> > > >>> them to
> > > >>> > > > >>>> interoperate with multiple future Flink versions in the
> > same
> > > >>> > release
> > > >>> > > > >> line
> > > >>> > > > >>>> without forced upgrades.
> > > >>> > > > >>>>
> > > >>> > > > >>>> Thanks,
> > > >>> > > > >>>> Thomas
> > > >>> > > > >>>>
> > > >>> > > > >>>> [1] https://github.com/apache/flink/pull/7249
> > > >>> > > > >>>> [2]
> > > >>> > > > >>>>
> > > >>> > > > >>>>
> > > >>> > > > >>>
> > > >>> > > > >>
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > >>> > > > >>>> [3]
> > > >>> > > > >>>>
> > > >>> > > > >>>>
> > > >>> > > > >>>
> > > >>> > > > >>
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > >>> > > > >>>> [4]
> > > >>> > > > >>>>
> > > >>> > > > >>>>
> > > >>> > > > >>>
> > > >>> > > > >>
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > >>> > > > >>>>
> > > >>> > > > >>>>
> > > >>> > > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <
> > > zjf...@gmail.com>
> > > >>> > > wrote:
> > > >>> > > > >>>>
> > > >>> > > > >>>>>>>> I'm not so sure whether the user should be able to
> > > define
> > > >>> > where
> > > >>> > > > >> the
> > > >>> > > > >>>> job
> > > >>> > > > >>>>> runs (in your example Yarn). This is actually
> independent
> > > of
> > > >>> the
> > > >>> > > job
> > > >>> > > > >>>>> development and is something which is decided at
> > deployment
> > > >>> time.
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> User don't need to specify execution mode
> > programmatically.
> > > >>> They
> > > >>> > > can
> > > >>> > > > >>> also
> > > >>> > > > >>>>> pass the execution mode from the arguments in flink run
> > > >>> command.
> > > >>> > > e.g.
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> bin/flink run -m yarn-cluster ....
> > > >>> > > > >>>>> bin/flink run -m local ...
> > > >>> > > > >>>>> bin/flink run -m host:port ...
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> Does this make sense to you ?
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>>>> To me it makes sense that the ExecutionEnvironment
> is
> > > not
> > > >>> > > > >> directly
> > > >>> > > > >>>>> initialized by the user and instead context sensitive
> how
> > > you
> > > >>> > want
> > > >>> > > to
> > > >>> > > > >>>>> execute your job (Flink CLI vs. IDE, for example).
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> Right, currently I notice Flink would create different
> > > >>> > > > >>>>> ContextExecutionEnvironment based on different
> submission
> > > >>> > scenarios
> > > >>> > > > >>>> (Flink
> > > >>> > > > >>>>> Cli vs IDE). To me this is kind of hack approach, not
> so
> > > >>> > > > >>> straightforward.
> > > >>> > > > >>>>> What I suggested above is that is that flink should
> > always
> > > >>> create
> > > >>> > > the
> > > >>> > > > >>>> same
> > > >>> > > > >>>>> ExecutionEnvironment but with different configuration,
> > and
> > > >>> based
> > > >>> > on
> > > >>> > > > >> the
> > > >>> > > > >>>>> configuration it would create the proper ClusterClient
> > for
> > > >>> > > different
> > > >>> > > > >>>>> behaviors.
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四
> > > >>> 下午11:18写道:
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>> You are probably right that we have code duplication
> > when
> > > it
> > > >>> > comes
> > > >>> > > > >> to
> > > >>> > > > >>>> the
> > > >>> > > > >>>>>> creation of the ClusterClient. This should be reduced
> in
> > > the
> > > >>> > > > >> future.
> > > >>> > > > >>>>>>
> > > >>> > > > >>>>>> I'm not so sure whether the user should be able to
> > define
> > > >>> where
> > > >>> > > the
> > > >>> > > > >>> job
> > > >>> > > > >>>>>> runs (in your example Yarn). This is actually
> > independent
> > > >>> of the
> > > >>> > > > >> job
> > > >>> > > > >>>>>> development and is something which is decided at
> > > deployment
> > > >>> > time.
> > > >>> > > > >> To
> > > >>> > > > >>> me
> > > >>> > > > >>>>> it
> > > >>> > > > >>>>>> makes sense that the ExecutionEnvironment is not
> > directly
> > > >>> > > > >> initialized
> > > >>> > > > >>>> by
> > > >>> > > > >>>>>> the user and instead context sensitive how you want to
> > > >>> execute
> > > >>> > > your
> > > >>> > > > >>> job
> > > >>> > > > >>>>>> (Flink CLI vs. IDE, for example). However, I agree
> that
> > > the
> > > >>> > > > >>>>>> ExecutionEnvironment should give you access to the
> > > >>> ClusterClient
> > > >>> > > > >> and
> > > >>> > > > >>> to
> > > >>> > > > >>>>> the
> > > >>> > > > >>>>>> job (maybe in the form of the JobGraph or a job plan).
> > > >>> > > > >>>>>>
> > > >>> > > > >>>>>> Cheers,
> > > >>> > > > >>>>>> Till
> > > >>> > > > >>>>>>
> > > >>> > > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <
> > > >>> zjf...@gmail.com>
> > > >>> > > > >> wrote:
> > > >>> > > > >>>>>>
> > > >>> > > > >>>>>>> Hi Till,
> > > >>> > > > >>>>>>> Thanks for the feedback. You are right that I expect
> > > better
> > > >>> > > > >>>>> programmatic
> > > >>> > > > >>>>>>> job submission/control api which could be used by
> > > >>> downstream
> > > >>> > > > >>> project.
> > > >>> > > > >>>>> And
> > > >>> > > > >>>>>>> it would benefit for the flink ecosystem. When I look
> > at
> > > >>> the
> > > >>> > code
> > > >>> > > > >>> of
> > > >>> > > > >>>>>> flink
> > > >>> > > > >>>>>>> scala-shell and sql-client (I believe they are not
> the
> > > >>> core of
> > > >>> > > > >>> flink,
> > > >>> > > > >>>>> but
> > > >>> > > > >>>>>>> belong to the ecosystem of flink), I find many
> > duplicated
> > > >>> code
> > > >>> > > > >> for
> > > >>> > > > >>>>>> creating
> > > >>> > > > >>>>>>> ClusterClient from user provided configuration
> > > >>> (configuration
> > > >>> > > > >>> format
> > > >>> > > > >>>>> may
> > > >>> > > > >>>>>> be
> > > >>> > > > >>>>>>> different from scala-shell and sql-client) and then
> use
> > > >>> that
> > > >>> > > > >>>>>> ClusterClient
> > > >>> > > > >>>>>>> to manipulate jobs. I don't think this is convenient
> > for
> > > >>> > > > >> downstream
> > > >>> > > > >>>>>>> projects. What I expect is that downstream project
> only
> > > >>> needs
> > > >>> > to
> > > >>> > > > >>>>> provide
> > > >>> > > > >>>>>>> necessary configuration info (maybe introducing class
> > > >>> > FlinkConf),
> > > >>> > > > >>> and
> > > >>> > > > >>>>>> then
> > > >>> > > > >>>>>>> build ExecutionEnvironment based on this FlinkConf,
> and
> > > >>> > > > >>>>>>> ExecutionEnvironment will create the proper
> > > ClusterClient.
> > > >>> It
> > > >>> > not
> > > >>> > > > >>>> only
> > > >>> > > > >>>>>>> benefit for the downstream project development but
> also
> > > be
> > > >>> > > > >> helpful
> > > >>> > > > >>>> for
> > > >>> > > > >>>>>>> their integration test with flink. Here's one sample
> > code
> > > >>> > snippet
> > > >>> > > > >>>> that
> > > >>> > > > >>>>> I
> > > >>> > > > >>>>>>> expect.
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>> val conf = new FlinkConf().mode("yarn")
> > > >>> > > > >>>>>>> val env = new ExecutionEnvironment(conf)
> > > >>> > > > >>>>>>> val jobId = env.submit(...)
> > > >>> > > > >>>>>>> val jobStatus =
> > > >>> env.getClusterClient().queryJobStatus(jobId)
> > > >>> > > > >>>>>>> env.getClusterClient().cancelJob(jobId)
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>> What do you think ?
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二
> > > >>> 下午6:28写道:
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>>> Hi Jeff,
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>> what you are proposing is to provide the user with
> > > better
> > > >>> > > > >>>>> programmatic
> > > >>> > > > >>>>>>> job
> > > >>> > > > >>>>>>>> control. There was actually an effort to achieve
> this
> > > but
> > > >>> it
> > > >>> > > > >> has
> > > >>> > > > >>>>> never
> > > >>> > > > >>>>>>> been
> > > >>> > > > >>>>>>>> completed [1]. However, there are some improvement
> in
> > > the
> > > >>> code
> > > >>> > > > >>> base
> > > >>> > > > >>>>>> now.
> > > >>> > > > >>>>>>>> Look for example at the NewClusterClient interface
> > which
> > > >>> > > > >> offers a
> > > >>> > > > >>>>>>>> non-blocking job submission. But I agree that we
> need
> > to
> > > >>> > > > >> improve
> > > >>> > > > >>>>> Flink
> > > >>> > > > >>>>>> in
> > > >>> > > > >>>>>>>> this regard.
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>> I would not be in favour if exposing all
> ClusterClient
> > > >>> calls
> > > >>> > > > >> via
> > > >>> > > > >>>> the
> > > >>> > > > >>>>>>>> ExecutionEnvironment because it would clutter the
> > class
> > > >>> and
> > > >>> > > > >> would
> > > >>> > > > >>>> not
> > > >>> > > > >>>>>> be
> > > >>> > > > >>>>>>> a
> > > >>> > > > >>>>>>>> good separation of concerns. Instead one idea could
> be
> > > to
> > > >>> > > > >>> retrieve
> > > >>> > > > >>>>> the
> > > >>> > > > >>>>>>>> current ClusterClient from the ExecutionEnvironment
> > > which
> > > >>> can
> > > >>> > > > >>> then
> > > >>> > > > >>>> be
> > > >>> > > > >>>>>>> used
> > > >>> > > > >>>>>>>> for cluster and job control. But before we start an
> > > effort
> > > >>> > > > >> here,
> > > >>> > > > >>> we
> > > >>> > > > >>>>>> need
> > > >>> > > > >>>>>>> to
> > > >>> > > > >>>>>>>> agree and capture what functionality we want to
> > provide.
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>> Initially, the idea was that we have the
> > > ClusterDescriptor
> > > >>> > > > >>>> describing
> > > >>> > > > >>>>>> how
> > > >>> > > > >>>>>>>> to talk to cluster manager like Yarn or Mesos. The
> > > >>> > > > >>>> ClusterDescriptor
> > > >>> > > > >>>>>> can
> > > >>> > > > >>>>>>> be
> > > >>> > > > >>>>>>>> used for deploying Flink clusters (job and session)
> > and
> > > >>> gives
> > > >>> > > > >>> you a
> > > >>> > > > >>>>>>>> ClusterClient. The ClusterClient controls the
> cluster
> > > >>> (e.g.
> > > >>> > > > >>>>> submitting
> > > >>> > > > >>>>>>>> jobs, listing all running jobs). And then there was
> > the
> > > >>> idea
> > > >>> > to
> > > >>> > > > >>>>>>> introduce a
> > > >>> > > > >>>>>>>> JobClient which you obtain from the ClusterClient to
> > > >>> trigger
> > > >>> > > > >> job
> > > >>> > > > >>>>>> specific
> > > >>> > > > >>>>>>>> operations (e.g. taking a savepoint, cancelling the
> > > job).
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>> [1]
> https://issues.apache.org/jira/browse/FLINK-4272
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>> Cheers,
> > > >>> > > > >>>>>>>> Till
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <
> > > >>> zjf...@gmail.com
> > > >>> > >
> > > >>> > > > >>>>> wrote:
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>>> Hi Folks,
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> I am trying to integrate flink into apache zeppelin
> > > >>> which is
> > > >>> > > > >> an
> > > >>> > > > >>>>>>>> interactive
> > > >>> > > > >>>>>>>>> notebook. And I hit several issues that is caused
> by
> > > >>> flink
> > > >>> > > > >>> client
> > > >>> > > > >>>>>> api.
> > > >>> > > > >>>>>>> So
> > > >>> > > > >>>>>>>>> I'd like to proposal the following changes for
> flink
> > > >>> client
> > > >>> > > > >>> api.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 1. Support nonblocking execution. Currently,
> > > >>> > > > >>>>>>> ExecutionEnvironment#execute
> > > >>> > > > >>>>>>>>> is a blocking method which would do 2 things, first
> > > >>> submit
> > > >>> > > > >> job
> > > >>> > > > >>>> and
> > > >>> > > > >>>>>> then
> > > >>> > > > >>>>>>>>> wait for job until it is finished. I'd like
> > introduce a
> > > >>> > > > >>>> nonblocking
> > > >>> > > > >>>>>>>>> execution method like ExecutionEnvironment#submit
> > which
> > > >>> only
> > > >>> > > > >>>> submit
> > > >>> > > > >>>>>> job
> > > >>> > > > >>>>>>>> and
> > > >>> > > > >>>>>>>>> then return jobId to client. And allow user to
> query
> > > the
> > > >>> job
> > > >>> > > > >>>> status
> > > >>> > > > >>>>>> via
> > > >>> > > > >>>>>>>> the
> > > >>> > > > >>>>>>>>> jobId.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 2. Add cancel api in
> > > >>> > > > >>>>> ExecutionEnvironment/StreamExecutionEnvironment,
> > > >>> > > > >>>>>>>>> currently the only way to cancel job is via cli
> > > >>> (bin/flink),
> > > >>> > > > >>> this
> > > >>> > > > >>>>> is
> > > >>> > > > >>>>>>> not
> > > >>> > > > >>>>>>>>> convenient for downstream project to use this
> > feature.
> > > >>> So I'd
> > > >>> > > > >>>> like
> > > >>> > > > >>>>> to
> > > >>> > > > >>>>>>> add
> > > >>> > > > >>>>>>>>> cancel api in ExecutionEnvironment
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 3. Add savepoint api in
> > > >>> > > > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > >>> > > > >>>>>>>> It
> > > >>> > > > >>>>>>>>> is similar as cancel api, we should use
> > > >>> ExecutionEnvironment
> > > >>> > > > >> as
> > > >>> > > > >>>> the
> > > >>> > > > >>>>>>>> unified
> > > >>> > > > >>>>>>>>> api for third party to integrate with flink.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 4. Add listener for job execution lifecycle.
> > Something
> > > >>> like
> > > >>> > > > >>>>>> following,
> > > >>> > > > >>>>>>> so
> > > >>> > > > >>>>>>>>> that downstream project can do custom logic in the
> > > >>> lifecycle
> > > >>> > > > >> of
> > > >>> > > > >>>>> job.
> > > >>> > > > >>>>>>> e.g.
> > > >>> > > > >>>>>>>>> Zeppelin would capture the jobId after job is
> > submitted
> > > >>> and
> > > >>> > > > >>> then
> > > >>> > > > >>>>> use
> > > >>> > > > >>>>>>> this
> > > >>> > > > >>>>>>>>> jobId to cancel it later when necessary.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> public interface JobListener {
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>   void onJobSubmitted(JobID jobId);
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>   void onJobExecuted(JobExecutionResult jobResult);
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>   void onJobCanceled(JobID jobId);
> > > >>> > > > >>>>>>>>> }
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 5. Enable session in ExecutionEnvironment.
> Currently
> > it
> > > >>> is
> > > >>> > > > >>>>> disabled,
> > > >>> > > > >>>>>>> but
> > > >>> > > > >>>>>>>>> session is very convenient for third party to
> > > submitting
> > > >>> jobs
> > > >>> > > > >>>>>>>> continually.
> > > >>> > > > >>>>>>>>> I hope flink can enable it again.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 6. Unify all flink client api into
> > > >>> > > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> This is a long term issue which needs more careful
> > > >>> thinking
> > > >>> > > > >> and
> > > >>> > > > >>>>>> design.
> > > >>> > > > >>>>>>>>> Currently some of features of flink is exposed in
> > > >>> > > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment,
> but
> > > >>> some are
> > > >>> > > > >>>>> exposed
> > > >>> > > > >>>>>>> in
> > > >>> > > > >>>>>>>>> cli instead of api, like the cancel and savepoint I
> > > >>> mentioned
> > > >>> > > > >>>>> above.
> > > >>> > > > >>>>>> I
> > > >>> > > > >>>>>>>>> think the root cause is due to that flink didn't
> > unify
> > > >>> the
> > > >>> > > > >>>>>> interaction
> > > >>> > > > >>>>>>>> with
> > > >>> > > > >>>>>>>>> flink. Here I list 3 scenarios of flink operation
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>   - Local job execution.  Flink will create
> > > >>> LocalEnvironment
> > > >>> > > > >>> and
> > > >>> > > > >>>>>> then
> > > >>> > > > >>>>>>>> use
> > > >>> > > > >>>>>>>>>   this LocalEnvironment to create LocalExecutor for
> > job
> > > >>> > > > >>>> execution.
> > > >>> > > > >>>>>>>>>   - Remote job execution. Flink will create
> > > ClusterClient
> > > >>> > > > >>> first
> > > >>> > > > >>>>> and
> > > >>> > > > >>>>>>> then
> > > >>> > > > >>>>>>>>>   create ContextEnvironment based on the
> > ClusterClient
> > > >>> and
> > > >>> > > > >>> then
> > > >>> > > > >>>>> run
> > > >>> > > > >>>>>>> the
> > > >>> > > > >>>>>>>>> job.
> > > >>> > > > >>>>>>>>>   - Job cancelation. Flink will create
> ClusterClient
> > > >>> first
> > > >>> > > > >> and
> > > >>> > > > >>>>> then
> > > >>> > > > >>>>>>>> cancel
> > > >>> > > > >>>>>>>>>   this job via this ClusterClient.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> As you can see in the above 3 scenarios. Flink
> didn't
> > > >>> use the
> > > >>> > > > >>>> same
> > > >>> > > > >>>>>>>>> approach(code path) to interact with flink
> > > >>> > > > >>>>>>>>> What I propose is following:
> > > >>> > > > >>>>>>>>> Create the proper
> LocalEnvironment/RemoteEnvironment
> > > >>> (based
> > > >>> > > > >> on
> > > >>> > > > >>>> user
> > > >>> > > > >>>>>>>>> configuration) --> Use this Environment to create
> > > proper
> > > >>> > > > >>>>>> ClusterClient
> > > >>> > > > >>>>>>>>> (LocalClusterClient or RestClusterClient) to
> > > interactive
> > > >>> with
> > > >>> > > > >>>>> Flink (
> > > >>> > > > >>>>>>> job
> > > >>> > > > >>>>>>>>> execution or cancelation)
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> This way we can unify the process of local
> execution
> > > and
> > > >>> > > > >> remote
> > > >>> > > > >>>>>>>> execution.
> > > >>> > > > >>>>>>>>> And it is much easier for third party to integrate
> > with
> > > >>> > > > >> flink,
> > > >>> > > > >>>>>> because
> > > >>> > > > >>>>>>>>> ExecutionEnvironment is the unified entry point for
> > > >>> flink.
> > > >>> > > > >> What
> > > >>> > > > >>>>> third
> > > >>> > > > >>>>>>>> party
> > > >>> > > > >>>>>>>>> needs to do is just pass configuration to
> > > >>> > > > >> ExecutionEnvironment
> > > >>> > > > >>>> and
> > > >>> > > > >>>>>>>>> ExecutionEnvironment will do the right thing based
> on
> > > the
> > > >>> > > > >>>>>>> configuration.
> > > >>> > > > >>>>>>>>> Flink cli can also be considered as flink api
> > consumer.
> > > >>> it
> > > >>> > > > >> just
> > > >>> > > > >>>>> pass
> > > >>> > > > >>>>>>> the
> > > >>> > > > >>>>>>>>> configuration to ExecutionEnvironment and let
> > > >>> > > > >>>> ExecutionEnvironment
> > > >>> > > > >>>>> to
> > > >>> > > > >>>>>>>>> create the proper ClusterClient instead of letting
> > cli
> > > to
> > > >>> > > > >>> create
> > > >>> > > > >>>>>>>>> ClusterClient directly.
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> 6 would involve large code refactoring, so I think
> we
> > > can
> > > >>> > > > >> defer
> > > >>> > > > >>>> it
> > > >>> > > > >>>>>> for
> > > >>> > > > >>>>>>>>> future release, 1,2,3,4,5 could be done at once I
> > > >>> believe.
> > > >>> > > > >> Let
> > > >>> > > > >>> me
> > > >>> > > > >>>>>> know
> > > >>> > > > >>>>>>>> your
> > > >>> > > > >>>>>>>>> comments and feedback, thanks
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> --
> > > >>> > > > >>>>>>>>> Best Regards
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>> Jeff Zhang
> > > >>> > > > >>>>>>>>>
> > > >>> > > > >>>>>>>>
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>> --
> > > >>> > > > >>>>>>> Best Regards
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>> Jeff Zhang
> > > >>> > > > >>>>>>>
> > > >>> > > > >>>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> --
> > > >>> > > > >>>>> Best Regards
> > > >>> > > > >>>>>
> > > >>> > > > >>>>> Jeff Zhang
> > > >>> > > > >>>>>
> > > >>> > > > >>>>
> > > >>> > > > >>>
> > > >>> > > > >>>
> > > >>> > > > >>> --
> > > >>> > > > >>> Best Regards
> > > >>> > > > >>>
> > > >>> > > > >>> Jeff Zhang
> > > >>> > > > >>>
> > > >>> > > > >>
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > --
> > > >>> > > > > Best Regards
> > > >>> > > > >
> > > >>> > > > > Jeff Zhang
> > > >>> > > >
> > > >>> > > >
> > > >>> > >
> > > >>> > > --
> > > >>> > > Best Regards
> > > >>> > >
> > > >>> > > Jeff Zhang
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>


-- 
Best Regards

Jeff Zhang

Reply via email to