Hi Aljoscha,

Thanks for your reply and participance. The Google Doc you linked to
requires
permission and I think you could use a share link instead.

I agree with that we almost reach a consensus that JobClient is necessary to
interacte with a running Job.

Let me check your open questions one by one.

1. Separate cluster creation and job submission for per-job mode.

As you mentioned here is where the opinions diverge. In my document there is
an alternative[2] that proposes excluding per-job deployment from client api
scope and now I find it is more reasonable we do the exclusion.

When in per-job mode, a dedicated JobCluster is launched to execute the
specific job. It is like a Flink Application more than a submission
of Flink Job. Client only takes care of job submission and assume there is
an existing cluster. In this way we are able to consider per-job issues
individually and JobClusterEntrypoint would be the utility class for per-job
deployment.

Nevertheless, user program works in both session mode and per-job mode
without
necessary to change code. JobClient in per-job mode is returned from
env.execute as normal. However, it would be no longer a wrapper of
RestClusterClient but a wrapper of PerJobClusterClient which communicates
to Dispatcher locally.

2. How to deal with plan preview.

With env.compile functions users can get JobGraph or FlinkPlan and thus
they can preview the plan with programming. Typically it looks like

if (preview configured) {
    FlinkPlan plan = env.compile();
    new JSONDumpGenerator(...).dump(plan);
} else {
    env.execute();
}

And `flink info` would be invalid any more.

3. How to deal with Jar Submission at the Web Frontend.

There is one more thread talked on this topic[1]. Apart from removing
the functions there are two alternatives.

One is to introduce an interface has a method returns JobGraph/FilnkPlan
and Jar Submission only support main-class implements this interface.
And then extract the JobGraph/FlinkPlan just by calling the method.
In this way, it is even possible to consider a separation of job creation
and job submission.

The other is, as you mentioned, let execute() do the actual execution.
We won't execute the main method in the WebFrontend but spawn a process
at WebMonitor side to execute. For return part we could generate the
JobID from WebMonitor and pass it to the execution environemnt.

4. How to deal with detached mode.

I think detached mode is a temporary solution for non-blocking submission.
In my document both submission and execution return a CompletableFuture and
users control whether or not wait for the result. In this point we don't
need a detached option but the functionality is covered.

5. How does per-job mode interact with interactive programming.

All of YARN, Mesos and Kubernetes scenarios follow the pattern launch a
JobCluster now. And I don't think there would be inconsistency between
different resource management.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
[2]
https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?disco=AAAADZaGGfs

Aljoscha Krettek <aljos...@apache.org> 于2019年8月16日周五 下午9:20写道:

> Hi,
>
> I read both Jeffs initial design document and the newer document by Tison.
> I also finally found the time to collect our thoughts on the issue, I had
> quite some discussions with Kostas and this is the result: [1].
>
> I think overall we agree that this part of the code is in dire need of
> some refactoring/improvements but I think there are still some open
> questions and some differences in opinion what those refactorings should
> look like.
>
> I think the API-side is quite clear, i.e. we need some JobClient API that
> allows interacting with a running Job. It could be worthwhile to spin that
> off into a separate FLIP because we can probably find consensus on that
> part more easily.
>
> For the rest, the main open questions from our doc are these:
>
>   - Do we want to separate cluster creation and job submission for per-job
> mode? In the past, there were conscious efforts to *not* separate job
> submission from cluster creation for per-job clusters for Mesos, YARN,
> Kubernets (see StandaloneJobClusterEntryPoint). Tison suggests in his
> design document to decouple this in order to unify job submission.
>
>   - How to deal with plan preview, which needs to hijack execute() and let
> the outside code catch an exception?
>
>   - How to deal with Jar Submission at the Web Frontend, which needs to
> hijack execute() and let the outside code catch an exception?
> CliFrontend.run() “hijacks” ExecutionEnvironment.execute() to get a
> JobGraph and then execute that JobGraph manually. We could get around that
> by letting execute() do the actual execution. One caveat for this is that
> now the main() method doesn’t return (or is forced to return by throwing an
> exception from execute()) which means that for Jar Submission from the
> WebFrontend we have a long-running main() method running in the
> WebFrontend. This doesn’t sound very good. We could get around this by
> removing the plan preview feature and by removing Jar Submission/Running.
>
>   - How to deal with detached mode? Right now, DetachedEnvironment will
> execute the job and return immediately. If users control when they want to
> return, by waiting on the job completion future, how do we deal with this?
> Do we simply remove the distinction between detached/non-detached?
>
>   - How does per-job mode interact with “interactive programming”
> (FLIP-36). For YARN, each execute() call could spawn a new Flink YARN
> cluster. What about Mesos and Kubernetes?
>
> The first open question is where the opinions diverge, I think. The rest
> are just open questions and interesting things that we need to consider.
>
> Best,
> Aljoscha
>
> [1]
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> <
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> >
>
> > On 31. Jul 2019, at 15:23, Jeff Zhang <zjf...@gmail.com> wrote:
> >
> > Thanks tison for the effort. I left a few comments.
> >
> >
> > Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午8:24写道:
> >
> >> Hi Flavio,
> >>
> >> Thanks for your reply.
> >>
> >> Either current impl and in the design, ClusterClient
> >> never takes responsibility for generating JobGraph.
> >> (what you see in current codebase is several class methods)
> >>
> >> Instead, user describes his program in the main method
> >> with ExecutionEnvironment apis and calls env.compile()
> >> or env.optimize() to get FlinkPlan and JobGraph respectively.
> >>
> >> For listing main classes in a jar and choose one for
> >> submission, you're now able to customize a CLI to do it.
> >> Specifically, the path of jar is passed as arguments and
> >> in the customized CLI you list main classes, choose one
> >> to submit to the cluster.
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Flavio Pompermaier <pomperma...@okkam.it> 于2019年7月31日周三 下午8:12写道:
> >>
> >>> Just one note on my side: it is not clear to me whether the client
> needs
> >> to
> >>> be able to generate a job graph or not.
> >>> In my opinion, the job jar must resides only on the server/jobManager
> >> side
> >>> and the client requires a way to get the job graph.
> >>> If you really want to access to the job graph, I'd add a dedicated
> method
> >>> on the ClusterClient. like:
> >>>
> >>>   - getJobGraph(jarId, mainClass): JobGraph
> >>>   - listMainClasses(jarId): List<String>
> >>>
> >>> These would require some addition also on the job manager endpoint as
> >>> well..what do you think?
> >>>
> >>> On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <wander4...@gmail.com>
> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> Here is a document[1] on client api enhancement from our perspective.
> >>>> We have investigated current implementations. And we propose
> >>>>
> >>>> 1. Unify the implementation of cluster deployment and job submission
> in
> >>>> Flink.
> >>>> 2. Provide programmatic interfaces to allow flexible job and cluster
> >>>> management.
> >>>>
> >>>> The first proposal is aimed at reducing code paths of cluster
> >> deployment
> >>>> and
> >>>> job submission so that one can adopt Flink in his usage easily. The
> >>> second
> >>>> proposal is aimed at providing rich interfaces for advanced users
> >>>> who want to make accurate control of these stages.
> >>>>
> >>>> Quick reference on open questions:
> >>>>
> >>>> 1. Exclude job cluster deployment from client side or redefine the
> >>> semantic
> >>>> of job cluster? Since it fits in a process quite different from
> session
> >>>> cluster deployment and job submission.
> >>>>
> >>>> 2. Maintain the codepaths handling class o.a.f.api.common.Program or
> >>>> implement customized program handling logic by customized CliFrontend?
> >>>> See also this thread[2] and the document[1].
> >>>>
> >>>> 3. Expose ClusterClient as public api or just expose api in
> >>>> ExecutionEnvironment
> >>>> and delegate them to ClusterClient? Further, in either way is it worth
> >> to
> >>>> introduce a JobClient which is an encapsulation of ClusterClient that
> >>>> associated to specific job?
> >>>>
> >>>> Best,
> >>>> tison.
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
> >>>> [2]
> >>>>
> >>>>
> >>>
> >>
> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
> >>>>
> >>>> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三 上午9:19写道:
> >>>>
> >>>>> Thanks Stephan, I will follow up this issue in next few weeks, and
> >> will
> >>>>> refine the design doc. We could discuss more details after 1.9
> >> release.
> >>>>>
> >>>>> Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道:
> >>>>>
> >>>>>> Hi all!
> >>>>>>
> >>>>>> This thread has stalled for a bit, which I assume ist mostly due to
> >>> the
> >>>>>> Flink 1.9 feature freeze and release testing effort.
> >>>>>>
> >>>>>> I personally still recognize this issue as one important to be
> >>> solved.
> >>>>> I'd
> >>>>>> be happy to help resume this discussion soon (after the 1.9
> >> release)
> >>>> and
> >>>>>> see if we can do some step towards this in Flink 1.10.
> >>>>>>
> >>>>>> Best,
> >>>>>> Stephan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier <
> >>>>> pomperma...@okkam.it>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> That's exactly what I suggested a long time ago: the Flink REST
> >>>> client
> >>>>>>> should not require any Flink dependency, only http library to
> >> call
> >>>> the
> >>>>>> REST
> >>>>>>> services to submit and monitor a job.
> >>>>>>> What I suggested also in [1] was to have a way to automatically
> >>>> suggest
> >>>>>> the
> >>>>>>> user (via a UI) the available main classes and their required
> >>>>>>> parameters[2].
> >>>>>>> Another problem we have with Flink is that the Rest client and
> >> the
> >>>> CLI
> >>>>>> one
> >>>>>>> behaves differently and we use the CLI client (via ssh) because
> >> it
> >>>>> allows
> >>>>>>> to call some other method after env.execute() [3] (we have to
> >> call
> >>>>>> another
> >>>>>>> REST service to signal the end of the job).
> >>>>>>> Int his regard, a dedicated interface, like the JobListener
> >>> suggested
> >>>>> in
> >>>>>>> the previous emails, would be very helpful (IMHO).
> >>>>>>>
> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
> >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-10862
> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-10879
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Flavio
> >>>>>>>
> >>>>>>> On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <zjf...@gmail.com>
> >>> wrote:
> >>>>>>>
> >>>>>>>> Hi, Tison,
> >>>>>>>>
> >>>>>>>> Thanks for your comments. Overall I agree with you that it is
> >>>>> difficult
> >>>>>>> for
> >>>>>>>> down stream project to integrate with flink and we need to
> >>> refactor
> >>>>> the
> >>>>>>>> current flink client api.
> >>>>>>>> And I agree that CliFrontend should only parsing command line
> >>>>> arguments
> >>>>>>> and
> >>>>>>>> then pass them to ExecutionEnvironment. It is
> >>>> ExecutionEnvironment's
> >>>>>>>> responsibility to compile job, create cluster, and submit job.
> >>>>> Besides
> >>>>>>>> that, Currently flink has many ExecutionEnvironment
> >>>> implementations,
> >>>>>> and
> >>>>>>>> flink will use the specific one based on the context. IMHO, it
> >> is
> >>>> not
> >>>>>>>> necessary, ExecutionEnvironment should be able to do the right
> >>>> thing
> >>>>>>> based
> >>>>>>>> on the FlinkConf it is received. Too many ExecutionEnvironment
> >>>>>>>> implementation is another burden for downstream project
> >>>> integration.
> >>>>>>>>
> >>>>>>>> One thing I'd like to mention is flink's scala shell and sql
> >>>> client,
> >>>>>>>> although they are sub-modules of flink, they could be treated
> >> as
> >>>>>>> downstream
> >>>>>>>> project which use flink's client api. Currently you will find
> >> it
> >>> is
> >>>>> not
> >>>>>>>> easy for them to integrate with flink, they share many
> >> duplicated
> >>>>> code.
> >>>>>>> It
> >>>>>>>> is another sign that we should refactor flink client api.
> >>>>>>>>
> >>>>>>>> I believe it is a large and hard change, and I am afraid we can
> >>> not
> >>>>>> keep
> >>>>>>>> compatibility since many of changes are user facing.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年6月24日周一 下午2:53写道:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> After a closer look on our client apis, I can see there are
> >> two
> >>>>> major
> >>>>>>>>> issues to consistency and integration, namely different
> >>>> deployment
> >>>>> of
> >>>>>>>>> job cluster which couples job graph creation and cluster
> >>>>> deployment,
> >>>>>>>>> and submission via CliFrontend confusing control flow of job
> >>>> graph
> >>>>>>>>> compilation and job submission. I'd like to follow the
> >> discuss
> >>>>> above,
> >>>>>>>>> mainly the process described by Jeff and Stephan, and share
> >> my
> >>>>>>>>> ideas on these issues.
> >>>>>>>>>
> >>>>>>>>> 1) CliFrontend confuses the control flow of job compilation
> >> and
> >>>>>>>> submission.
> >>>>>>>>> Following the process of job submission Stephan and Jeff
> >>>> described,
> >>>>>>>>> execution environment knows all configs of the cluster and
> >>>>>>> topos/settings
> >>>>>>>>> of the job. Ideally, in the main method of user program, it
> >>> calls
> >>>>>>>> #execute
> >>>>>>>>> (or named #submit) and Flink deploys the cluster, compile the
> >>> job
> >>>>>> graph
> >>>>>>>>> and submit it to the cluster. However, current CliFrontend
> >> does
> >>>> all
> >>>>>>> these
> >>>>>>>>> things inside its #runProgram method, which introduces a lot
> >> of
> >>>>>>>> subclasses
> >>>>>>>>> of (stream) execution environment.
> >>>>>>>>>
> >>>>>>>>> Actually, it sets up an exec env that hijacks the
> >>>>>> #execute/executePlan
> >>>>>>>>> method, initializes the job graph and abort execution. And
> >> then
> >>>>>>>>> control flow back to CliFrontend, it deploys the cluster(or
> >>>>> retrieve
> >>>>>>>>> the client) and submits the job graph. This is quite a
> >> specific
> >>>>>>> internal
> >>>>>>>>> process inside Flink and none of consistency to anything.
> >>>>>>>>>
> >>>>>>>>> 2) Deployment of job cluster couples job graph creation and
> >>>> cluster
> >>>>>>>>> deployment. Abstractly, from user job to a concrete
> >> submission,
> >>>> it
> >>>>>>>> requires
> >>>>>>>>>
> >>>>>>>>>     create JobGraph --\
> >>>>>>>>>
> >>>>>>>>> create ClusterClient -->  submit JobGraph
> >>>>>>>>>
> >>>>>>>>> such a dependency. ClusterClient was created by deploying or
> >>>>>>> retrieving.
> >>>>>>>>> JobGraph submission requires a compiled JobGraph and valid
> >>>>>>> ClusterClient,
> >>>>>>>>> but the creation of ClusterClient is abstractly independent
> >> of
> >>>> that
> >>>>>> of
> >>>>>>>>> JobGraph. However, in job cluster mode, we deploy job cluster
> >>>> with
> >>>>> a
> >>>>>>> job
> >>>>>>>>> graph, which means we use another process:
> >>>>>>>>>
> >>>>>>>>> create JobGraph --> deploy cluster with the JobGraph
> >>>>>>>>>
> >>>>>>>>> Here is another inconsistency and downstream projects/client
> >>> apis
> >>>>> are
> >>>>>>>>> forced to handle different cases with rare supports from
> >> Flink.
> >>>>>>>>>
> >>>>>>>>> Since we likely reached a consensus on
> >>>>>>>>>
> >>>>>>>>> 1. all configs gathered by Flink configuration and passed
> >>>>>>>>> 2. execution environment knows all configs and handles
> >>>>> execution(both
> >>>>>>>>> deployment and submission)
> >>>>>>>>>
> >>>>>>>>> to the issues above I propose eliminating inconsistencies by
> >>>>>> following
> >>>>>>>>> approach:
> >>>>>>>>>
> >>>>>>>>> 1) CliFrontend should exactly be a front end, at least for
> >>> "run"
> >>>>>>> command.
> >>>>>>>>> That means it just gathered and passed all config from
> >> command
> >>>> line
> >>>>>> to
> >>>>>>>>> the main method of user program. Execution environment knows
> >>> all
> >>>>> the
> >>>>>>> info
> >>>>>>>>> and with an addition to utils for ClusterClient, we
> >> gracefully
> >>>> get
> >>>>> a
> >>>>>>>>> ClusterClient by deploying or retrieving. In this way, we
> >> don't
> >>>>> need
> >>>>>> to
> >>>>>>>>> hijack #execute/executePlan methods and can remove various
> >>>> hacking
> >>>>>>>>> subclasses of exec env, as well as #run methods in
> >>>>> ClusterClient(for
> >>>>>> an
> >>>>>>>>> interface-ized ClusterClient). Now the control flow flows
> >> from
> >>>>>>>> CliFrontend
> >>>>>>>>> to the main method and never returns.
> >>>>>>>>>
> >>>>>>>>> 2) Job cluster means a cluster for the specific job. From
> >>> another
> >>>>>>>>> perspective, it is an ephemeral session. We may decouple the
> >>>>>> deployment
> >>>>>>>>> with a compiled job graph, but start a session with idle
> >>> timeout
> >>>>>>>>> and submit the job following.
> >>>>>>>>>
> >>>>>>>>> These topics, before we go into more details on design or
> >>>>>>> implementation,
> >>>>>>>>> are better to be aware and discussed for a consensus.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> tison.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年6月20日周四 上午3:21写道:
> >>>>>>>>>
> >>>>>>>>>> Hi Jeff,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for raising this thread and the design document!
> >>>>>>>>>>
> >>>>>>>>>> As @Thomas Weise mentioned above, extending config to flink
> >>>>>>>>>> requires far more effort than it should be. Another example
> >>>>>>>>>> is we achieve detach mode by introduce another execution
> >>>>>>>>>> environment which also hijack #execute method.
> >>>>>>>>>>
> >>>>>>>>>> I agree with your idea that user would configure all things
> >>>>>>>>>> and flink "just" respect it. On this topic I think the
> >> unusual
> >>>>>>>>>> control flow when CliFrontend handle "run" command is the
> >>>> problem.
> >>>>>>>>>> It handles several configs, mainly about cluster settings,
> >> and
> >>>>>>>>>> thus main method of user program is unaware of them. Also it
> >>>>>> compiles
> >>>>>>>>>> app to job graph by run the main method with a hijacked exec
> >>>> env,
> >>>>>>>>>> which constrain the main method further.
> >>>>>>>>>>
> >>>>>>>>>> I'd like to write down a few of notes on configs/args pass
> >> and
> >>>>>>> respect,
> >>>>>>>>>> as well as decoupling job compilation and submission. Share
> >> on
> >>>>> this
> >>>>>>>>>> thread later.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> tison.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一
> >> 下午7:29写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Jeff and Flavio,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks Jeff a lot for proposing the design document.
> >>>>>>>>>>>
> >>>>>>>>>>> We are also working on refactoring ClusterClient to allow
> >>>>> flexible
> >>>>>>> and
> >>>>>>>>>>> efficient job management in our real-time platform.
> >>>>>>>>>>> We would like to draft a document to share our ideas with
> >>> you.
> >>>>>>>>>>>
> >>>>>>>>>>> I think it's a good idea to have something like Apache Livy
> >>> for
> >>>>>>> Flink,
> >>>>>>>>>>> and
> >>>>>>>>>>> the efforts discussed here will take a great step forward
> >> to
> >>>> it.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Xiaogang
> >>>>>>>>>>>
> >>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一
> >>>>> 下午7:13写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Is there any possibility to have something like Apache
> >> Livy
> >>>> [1]
> >>>>>>> also
> >>>>>>>>>>> for
> >>>>>>>>>>>> Flink in the future?
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1] https://livy.apache.org/
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <
> >>> zjf...@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Any API we expose should not have dependencies on
> >>> the
> >>>>>>> runtime
> >>>>>>>>>>>>> (flink-runtime) package or other implementation
> >> details.
> >>> To
> >>>>> me,
> >>>>>>>> this
> >>>>>>>>>>>> means
> >>>>>>>>>>>>> that the current ClusterClient cannot be exposed to
> >> users
> >>>>>> because
> >>>>>>>> it
> >>>>>>>>>>>> uses
> >>>>>>>>>>>>> quite some classes from the optimiser and runtime
> >>> packages.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We should change ClusterClient from class to interface.
> >>>>>>>>>>>>> ExecutionEnvironment only use the interface
> >> ClusterClient
> >>>>> which
> >>>>>>>>>>> should be
> >>>>>>>>>>>>> in flink-clients while the concrete implementation
> >> class
> >>>>> could
> >>>>>> be
> >>>>>>>> in
> >>>>>>>>>>>>> flink-runtime.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What happens when a failure/restart in the client
> >>>>> happens?
> >>>>>>>> There
> >>>>>>>>>>> need
> >>>>>>>>>>>>> to be a way of re-establishing the connection to the
> >> job,
> >>>> set
> >>>>>> up
> >>>>>>>> the
> >>>>>>>>>>>>> listeners again, etc.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Good point.  First we need to define what does
> >>>>> failure/restart
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>> client mean. IIUC, that usually mean network failure
> >>> which
> >>>>> will
> >>>>>>>>>>> happen in
> >>>>>>>>>>>>> class RestClient. If my understanding is correct,
> >>>>> restart/retry
> >>>>>>>>>>> mechanism
> >>>>>>>>>>>>> should be done in RestClient.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二
> >>>>>> 下午11:10写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Some points to consider:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> * Any API we expose should not have dependencies on
> >> the
> >>>>>> runtime
> >>>>>>>>>>>>>> (flink-runtime) package or other implementation
> >>> details.
> >>>> To
> >>>>>> me,
> >>>>>>>>>>> this
> >>>>>>>>>>>>> means
> >>>>>>>>>>>>>> that the current ClusterClient cannot be exposed to
> >>> users
> >>>>>>> because
> >>>>>>>>>>> it
> >>>>>>>>>>>>> uses
> >>>>>>>>>>>>>> quite some classes from the optimiser and runtime
> >>>> packages.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> * What happens when a failure/restart in the client
> >>>>> happens?
> >>>>>>>> There
> >>>>>>>>>>> need
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> be a way of re-establishing the connection to the
> >> job,
> >>>> set
> >>>>> up
> >>>>>>> the
> >>>>>>>>>>>>> listeners
> >>>>>>>>>>>>>> again, etc.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 29. May 2019, at 10:17, Jeff Zhang <
> >>>> zjf...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Sorry folks, the design doc is late as you
> >> expected.
> >>>>> Here's
> >>>>>>> the
> >>>>>>>>>>>> design
> >>>>>>>>>>>>>> doc
> >>>>>>>>>>>>>>> I drafted, welcome any comments and feedback.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Stephan Ewen <se...@apache.org> 于2019年2月14日周四
> >>>> 下午8:43写道:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Nice that this discussion is happening.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In the FLIP, we could also revisit the entire role
> >>> of
> >>>>> the
> >>>>>>>>>>>> environments
> >>>>>>>>>>>>>>>> again.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Initially, the idea was:
> >>>>>>>>>>>>>>>> - the environments take care of the specific
> >> setup
> >>>> for
> >>>>>>>>>>> standalone
> >>>>>>>>>>>> (no
> >>>>>>>>>>>>>>>> setup needed), yarn, mesos, etc.
> >>>>>>>>>>>>>>>> - the session ones have control over the session.
> >>> The
> >>>>>>>>>>> environment
> >>>>>>>>>>>>> holds
> >>>>>>>>>>>>>>>> the session client.
> >>>>>>>>>>>>>>>> - running a job gives a "control" object for that
> >>>> job.
> >>>>>> That
> >>>>>>>>>>>> behavior
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> the same in all environments.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The actual implementation diverged quite a bit
> >> from
> >>>>> that.
> >>>>>>>> Happy
> >>>>>>>>>>> to
> >>>>>>>>>>>>> see a
> >>>>>>>>>>>>>>>> discussion about straitening this out a bit more.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <
> >>>>>>> zjf...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi folks,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Sorry for late response, It seems we reach
> >>> consensus
> >>>> on
> >>>>>>>> this, I
> >>>>>>>>>>>> will
> >>>>>>>>>>>>>>>> create
> >>>>>>>>>>>>>>>>> FLIP for this with more detailed design
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thomas Weise <t...@apache.org> 于2018年12月21日周五
> >>>>> 上午11:43写道:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Great to see this discussion seeded! The
> >> problems
> >>>> you
> >>>>>> face
> >>>>>>>>>>> with
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> Zeppelin integration are also affecting other
> >>>>> downstream
> >>>>>>>>>>> projects,
> >>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>> Beam.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> We just enabled the savepoint restore option in
> >>>>>>>>>>>>>> RemoteStreamEnvironment
> >>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>> and that was more difficult than it should be.
> >> The
> >>>>> main
> >>>>>>>> issue
> >>>>>>>>>>> is
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> environment and cluster client aren't decoupled.
> >>>>> Ideally
> >>>>>>> it
> >>>>>>>>>>> should
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> possible to just get the matching cluster client
> >>>> from
> >>>>>> the
> >>>>>>>>>>>>> environment
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> then control the job through it (environment as
> >>>>> factory
> >>>>>>> for
> >>>>>>>>>>>> cluster
> >>>>>>>>>>>>>>>>>> client). But note that the environment classes
> >> are
> >>>>> part
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>>>>> and it is not straightforward to make larger
> >>> changes
> >>>>>>> without
> >>>>>>>>>>>>> breaking
> >>>>>>>>>>>>>>>>>> backward compatibility.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> ClusterClient currently exposes internal classes
> >>>> like
> >>>>>>>>>>> JobGraph and
> >>>>>>>>>>>>>>>>>> StreamGraph. But it should be possible to wrap
> >>> this
> >>>>>> with a
> >>>>>>>> new
> >>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>> that brings the required job control
> >> capabilities
> >>>> for
> >>>>>>>>>>> downstream
> >>>>>>>>>>>>>>>>> projects.
> >>>>>>>>>>>>>>>>>> Perhaps it is helpful to look at some of the
> >>>>> interfaces
> >>>>>> in
> >>>>>>>>>>> Beam
> >>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>> thinking about this: [2] for the portable job
> >> API
> >>>> and
> >>>>>> [3]
> >>>>>>>> for
> >>>>>>>>>>> the
> >>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>> asynchronous job control from the Beam Java SDK.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The backward compatibility discussion [4] is
> >> also
> >>>>>> relevant
> >>>>>>>>>>> here. A
> >>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>>> should shield downstream projects from internals
> >>> and
> >>>>>> allow
> >>>>>>>>>>> them to
> >>>>>>>>>>>>>>>>>> interoperate with multiple future Flink versions
> >>> in
> >>>>> the
> >>>>>>> same
> >>>>>>>>>>>> release
> >>>>>>>>>>>>>>>> line
> >>>>>>>>>>>>>>>>>> without forced upgrades.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/7249
> >>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> >>>>>>>>>>>>>>>>>> [3]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> >>>>>>>>>>>>>>>>>> [4]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <
> >>>>>>>> zjf...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I'm not so sure whether the user should be
> >>> able
> >>>> to
> >>>>>>>> define
> >>>>>>>>>>>> where
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>> runs (in your example Yarn). This is actually
> >>>>>> independent
> >>>>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>> development and is something which is decided
> >> at
> >>>>>>> deployment
> >>>>>>>>>>> time.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> User don't need to specify execution mode
> >>>>>>> programmatically.
> >>>>>>>>>>> They
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>> pass the execution mode from the arguments in
> >>> flink
> >>>>> run
> >>>>>>>>>>> command.
> >>>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> bin/flink run -m yarn-cluster ....
> >>>>>>>>>>>>>>>>>>> bin/flink run -m local ...
> >>>>>>>>>>>>>>>>>>> bin/flink run -m host:port ...
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Does this make sense to you ?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> To me it makes sense that the
> >>>> ExecutionEnvironment
> >>>>>> is
> >>>>>>>> not
> >>>>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>>> initialized by the user and instead context
> >>>> sensitive
> >>>>>> how
> >>>>>>>> you
> >>>>>>>>>>>> want
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> execute your job (Flink CLI vs. IDE, for
> >>> example).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Right, currently I notice Flink would create
> >>>>> different
> >>>>>>>>>>>>>>>>>>> ContextExecutionEnvironment based on different
> >>>>>> submission
> >>>>>>>>>>>> scenarios
> >>>>>>>>>>>>>>>>>> (Flink
> >>>>>>>>>>>>>>>>>>> Cli vs IDE). To me this is kind of hack
> >> approach,
> >>>> not
> >>>>>> so
> >>>>>>>>>>>>>>>>> straightforward.
> >>>>>>>>>>>>>>>>>>> What I suggested above is that is that flink
> >>> should
> >>>>>>> always
> >>>>>>>>>>> create
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>> ExecutionEnvironment but with different
> >>>>> configuration,
> >>>>>>> and
> >>>>>>>>>>> based
> >>>>>>>>>>>> on
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> configuration it would create the proper
> >>>>> ClusterClient
> >>>>>>> for
> >>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>> behaviors.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Till Rohrmann <trohrm...@apache.org>
> >>>> 于2018年12月20日周四
> >>>>>>>>>>> 下午11:18写道:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> You are probably right that we have code
> >>>> duplication
> >>>>>>> when
> >>>>>>>> it
> >>>>>>>>>>>> comes
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> creation of the ClusterClient. This should be
> >>>>> reduced
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>>>> future.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'm not so sure whether the user should be
> >> able
> >>> to
> >>>>>>> define
> >>>>>>>>>>> where
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>> runs (in your example Yarn). This is actually
> >>>>>>> independent
> >>>>>>>>>>> of the
> >>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>> development and is something which is decided
> >> at
> >>>>>>>> deployment
> >>>>>>>>>>>> time.
> >>>>>>>>>>>>>>>> To
> >>>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> makes sense that the ExecutionEnvironment is
> >> not
> >>>>>>> directly
> >>>>>>>>>>>>>>>> initialized
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>> the user and instead context sensitive how you
> >>>> want
> >>>>> to
> >>>>>>>>>>> execute
> >>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>> (Flink CLI vs. IDE, for example). However, I
> >>> agree
> >>>>>> that
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> ExecutionEnvironment should give you access to
> >>> the
> >>>>>>>>>>> ClusterClient
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> job (maybe in the form of the JobGraph or a
> >> job
> >>>>> plan).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <
> >>>>>>>>>>> zjf...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Till,
> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. You are right that I
> >>>>> expect
> >>>>>>>> better
> >>>>>>>>>>>>>>>>>>> programmatic
> >>>>>>>>>>>>>>>>>>>>> job submission/control api which could be
> >> used
> >>> by
> >>>>>>>>>>> downstream
> >>>>>>>>>>>>>>>>> project.
> >>>>>>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>>>>> it would benefit for the flink ecosystem.
> >> When
> >>> I
> >>>>> look
> >>>>>>> at
> >>>>>>>>>>> the
> >>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> flink
> >>>>>>>>>>>>>>>>>>>>> scala-shell and sql-client (I believe they
> >> are
> >>>> not
> >>>>>> the
> >>>>>>>>>>> core of
> >>>>>>>>>>>>>>>>> flink,
> >>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>> belong to the ecosystem of flink), I find
> >> many
> >>>>>>> duplicated
> >>>>>>>>>>> code
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> creating
> >>>>>>>>>>>>>>>>>>>>> ClusterClient from user provided
> >> configuration
> >>>>>>>>>>> (configuration
> >>>>>>>>>>>>>>>>> format
> >>>>>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> different from scala-shell and sql-client)
> >> and
> >>>> then
> >>>>>> use
> >>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> ClusterClient
> >>>>>>>>>>>>>>>>>>>>> to manipulate jobs. I don't think this is
> >>>>> convenient
> >>>>>>> for
> >>>>>>>>>>>>>>>> downstream
> >>>>>>>>>>>>>>>>>>>>> projects. What I expect is that downstream
> >>>> project
> >>>>>> only
> >>>>>>>>>>> needs
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>>>>>>>> necessary configuration info (maybe
> >> introducing
> >>>>> class
> >>>>>>>>>>>> FlinkConf),
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>> build ExecutionEnvironment based on this
> >>>> FlinkConf,
> >>>>>> and
> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will create the proper
> >>>>>>>> ClusterClient.
> >>>>>>>>>>> It
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>> benefit for the downstream project
> >> development
> >>>> but
> >>>>>> also
> >>>>>>>> be
> >>>>>>>>>>>>>>>> helpful
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> their integration test with flink. Here's one
> >>>>> sample
> >>>>>>> code
> >>>>>>>>>>>> snippet
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>> expect.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> val conf = new FlinkConf().mode("yarn")
> >>>>>>>>>>>>>>>>>>>>> val env = new ExecutionEnvironment(conf)
> >>>>>>>>>>>>>>>>>>>>> val jobId = env.submit(...)
> >>>>>>>>>>>>>>>>>>>>> val jobStatus =
> >>>>>>>>>>> env.getClusterClient().queryJobStatus(jobId)
> >>>>>>>>>>>>>>>>>>>>> env.getClusterClient().cancelJob(jobId)
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> What do you think ?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Till Rohrmann <trohrm...@apache.org>
> >>>>> 于2018年12月11日周二
> >>>>>>>>>>> 下午6:28写道:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Jeff,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> what you are proposing is to provide the
> >> user
> >>>> with
> >>>>>>>> better
> >>>>>>>>>>>>>>>>>>> programmatic
> >>>>>>>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>>>> control. There was actually an effort to
> >>> achieve
> >>>>>> this
> >>>>>>>> but
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>>>> never
> >>>>>>>>>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>> completed [1]. However, there are some
> >>>> improvement
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>> code
> >>>>>>>>>>>>>>>>> base
> >>>>>>>>>>>>>>>>>>>> now.
> >>>>>>>>>>>>>>>>>>>>>> Look for example at the NewClusterClient
> >>>> interface
> >>>>>>> which
> >>>>>>>>>>>>>>>> offers a
> >>>>>>>>>>>>>>>>>>>>>> non-blocking job submission. But I agree
> >> that
> >>> we
> >>>>>> need
> >>>>>>> to
> >>>>>>>>>>>>>>>> improve
> >>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> this regard.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I would not be in favour if exposing all
> >>>>>> ClusterClient
> >>>>>>>>>>> calls
> >>>>>>>>>>>>>>>> via
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment because it would
> >> clutter
> >>>> the
> >>>>>>> class
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> good separation of concerns. Instead one
> >> idea
> >>>>> could
> >>>>>> be
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> retrieve
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> current ClusterClient from the
> >>>>> ExecutionEnvironment
> >>>>>>>> which
> >>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>>>> for cluster and job control. But before we
> >>> start
> >>>>> an
> >>>>>>>> effort
> >>>>>>>>>>>>>>>> here,
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> agree and capture what functionality we want
> >>> to
> >>>>>>> provide.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Initially, the idea was that we have the
> >>>>>>>> ClusterDescriptor
> >>>>>>>>>>>>>>>>>> describing
> >>>>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>> to talk to cluster manager like Yarn or
> >> Mesos.
> >>>> The
> >>>>>>>>>>>>>>>>>> ClusterDescriptor
> >>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> used for deploying Flink clusters (job and
> >>>>> session)
> >>>>>>> and
> >>>>>>>>>>> gives
> >>>>>>>>>>>>>>>>> you a
> >>>>>>>>>>>>>>>>>>>>>> ClusterClient. The ClusterClient controls
> >> the
> >>>>>> cluster
> >>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>> submitting
> >>>>>>>>>>>>>>>>>>>>>> jobs, listing all running jobs). And then
> >>> there
> >>>>> was
> >>>>>>> the
> >>>>>>>>>>> idea
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> introduce a
> >>>>>>>>>>>>>>>>>>>>>> JobClient which you obtain from the
> >>>> ClusterClient
> >>>>> to
> >>>>>>>>>>> trigger
> >>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>>>>> operations (e.g. taking a savepoint,
> >>> cancelling
> >>>>> the
> >>>>>>>> job).
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>> https://issues.apache.org/jira/browse/FLINK-4272
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>> Till
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang
> >> <
> >>>>>>>>>>> zjf...@gmail.com
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Folks,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I am trying to integrate flink into apache
> >>>>> zeppelin
> >>>>>>>>>>> which is
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>> interactive
> >>>>>>>>>>>>>>>>>>>>>>> notebook. And I hit several issues that is
> >>>> caused
> >>>>>> by
> >>>>>>>>>>> flink
> >>>>>>>>>>>>>>>>> client
> >>>>>>>>>>>>>>>>>>>> api.
> >>>>>>>>>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>>>>>>>> I'd like to proposal the following changes
> >>> for
> >>>>>> flink
> >>>>>>>>>>> client
> >>>>>>>>>>>>>>>>> api.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 1. Support nonblocking execution.
> >> Currently,
> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment#execute
> >>>>>>>>>>>>>>>>>>>>>>> is a blocking method which would do 2
> >> things,
> >>>>> first
> >>>>>>>>>>> submit
> >>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>> wait for job until it is finished. I'd like
> >>>>>>> introduce a
> >>>>>>>>>>>>>>>>>> nonblocking
> >>>>>>>>>>>>>>>>>>>>>>> execution method like
> >>>> ExecutionEnvironment#submit
> >>>>>>> which
> >>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>> submit
> >>>>>>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> then return jobId to client. And allow user
> >>> to
> >>>>>> query
> >>>>>>>> the
> >>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>> status
> >>>>>>>>>>>>>>>>>>>> via
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> jobId.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 2. Add cancel api in
> >>>>>>>>>>>>>>>>>>>
> >> ExecutionEnvironment/StreamExecutionEnvironment,
> >>>>>>>>>>>>>>>>>>>>>>> currently the only way to cancel job is via
> >>> cli
> >>>>>>>>>>> (bin/flink),
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>> convenient for downstream project to use
> >> this
> >>>>>>> feature.
> >>>>>>>>>>> So I'd
> >>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>>>>> cancel api in ExecutionEnvironment
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 3. Add savepoint api in
> >>>>>>>>>>>>>>>>>>>>>
> >>> ExecutionEnvironment/StreamExecutionEnvironment.
> >>>>>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>> is similar as cancel api, we should use
> >>>>>>>>>>> ExecutionEnvironment
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> unified
> >>>>>>>>>>>>>>>>>>>>>>> api for third party to integrate with
> >> flink.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 4. Add listener for job execution
> >> lifecycle.
> >>>>>>> Something
> >>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>> following,
> >>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>> that downstream project can do custom logic
> >>> in
> >>>>> the
> >>>>>>>>>>> lifecycle
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> job.
> >>>>>>>>>>>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>>>>>>>>>> Zeppelin would capture the jobId after job
> >> is
> >>>>>>> submitted
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>> jobId to cancel it later when necessary.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> public interface JobListener {
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>  void onJobSubmitted(JobID jobId);
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>  void onJobExecuted(JobExecutionResult
> >>>>> jobResult);
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>  void onJobCanceled(JobID jobId);
> >>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 5. Enable session in ExecutionEnvironment.
> >>>>>> Currently
> >>>>>>> it
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> disabled,
> >>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>> session is very convenient for third party
> >> to
> >>>>>>>> submitting
> >>>>>>>>>>> jobs
> >>>>>>>>>>>>>>>>>>>>>> continually.
> >>>>>>>>>>>>>>>>>>>>>>> I hope flink can enable it again.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 6. Unify all flink client api into
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>> ExecutionEnvironment/StreamExecutionEnvironment.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This is a long term issue which needs more
> >>>>> careful
> >>>>>>>>>>> thinking
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> design.
> >>>>>>>>>>>>>>>>>>>>>>> Currently some of features of flink is
> >>> exposed
> >>>> in
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>> ExecutionEnvironment/StreamExecutionEnvironment,
> >>>>>> but
> >>>>>>>>>>> some are
> >>>>>>>>>>>>>>>>>>> exposed
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> cli instead of api, like the cancel and
> >>>>> savepoint I
> >>>>>>>>>>> mentioned
> >>>>>>>>>>>>>>>>>>> above.
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>> think the root cause is due to that flink
> >>>> didn't
> >>>>>>> unify
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> interaction
> >>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>> flink. Here I list 3 scenarios of flink
> >>>> operation
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>  - Local job execution.  Flink will create
> >>>>>>>>>>> LocalEnvironment
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>  this LocalEnvironment to create
> >>> LocalExecutor
> >>>>> for
> >>>>>>> job
> >>>>>>>>>>>>>>>>>> execution.
> >>>>>>>>>>>>>>>>>>>>>>>  - Remote job execution. Flink will create
> >>>>>>>> ClusterClient
> >>>>>>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>>  create ContextEnvironment based on the
> >>>>>>> ClusterClient
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>> run
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> job.
> >>>>>>>>>>>>>>>>>>>>>>>  - Job cancelation. Flink will create
> >>>>>> ClusterClient
> >>>>>>>>>>> first
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> cancel
> >>>>>>>>>>>>>>>>>>>>>>>  this job via this ClusterClient.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> As you can see in the above 3 scenarios.
> >>> Flink
> >>>>>> didn't
> >>>>>>>>>>> use the
> >>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>> approach(code path) to interact with flink
> >>>>>>>>>>>>>>>>>>>>>>> What I propose is following:
> >>>>>>>>>>>>>>>>>>>>>>> Create the proper
> >>>>>> LocalEnvironment/RemoteEnvironment
> >>>>>>>>>>> (based
> >>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>>>>>> configuration) --> Use this Environment to
> >>>> create
> >>>>>>>> proper
> >>>>>>>>>>>>>>>>>>>> ClusterClient
> >>>>>>>>>>>>>>>>>>>>>>> (LocalClusterClient or RestClusterClient)
> >> to
> >>>>>>>> interactive
> >>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> Flink (
> >>>>>>>>>>>>>>>>>>>>> job
> >>>>>>>>>>>>>>>>>>>>>>> execution or cancelation)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This way we can unify the process of local
> >>>>>> execution
> >>>>>>>> and
> >>>>>>>>>>>>>>>> remote
> >>>>>>>>>>>>>>>>>>>>>> execution.
> >>>>>>>>>>>>>>>>>>>>>>> And it is much easier for third party to
> >>>>> integrate
> >>>>>>> with
> >>>>>>>>>>>>>>>> flink,
> >>>>>>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment is the unified entry
> >>> point
> >>>>> for
> >>>>>>>>>>> flink.
> >>>>>>>>>>>>>>>> What
> >>>>>>>>>>>>>>>>>>> third
> >>>>>>>>>>>>>>>>>>>>>> party
> >>>>>>>>>>>>>>>>>>>>>>> needs to do is just pass configuration to
> >>>>>>>>>>>>>>>> ExecutionEnvironment
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will do the right
> >> thing
> >>>>> based
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> configuration.
> >>>>>>>>>>>>>>>>>>>>>>> Flink cli can also be considered as flink
> >> api
> >>>>>>> consumer.
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>> pass
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> configuration to ExecutionEnvironment and
> >> let
> >>>>>>>>>>>>>>>>>> ExecutionEnvironment
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> create the proper ClusterClient instead of
> >>>>> letting
> >>>>>>> cli
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> create
> >>>>>>>>>>>>>>>>>>>>>>> ClusterClient directly.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 6 would involve large code refactoring, so
> >> I
> >>>>> think
> >>>>>> we
> >>>>>>>> can
> >>>>>>>>>>>>>>>> defer
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> future release, 1,2,3,4,5 could be done at
> >>>> once I
> >>>>>>>>>>> believe.
> >>>>>>>>>>>>>>>> Let
> >>>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>>> know
> >>>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>> comments and feedback, thanks
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Best Regards
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> Best Regards
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>> Best Regards
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jeff Zhang
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> Best Regards
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Jeff Zhang
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> Best Regards
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jeff Zhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Best Regards
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jeff Zhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Best Regards
> >>>>>>>>
> >>>>>>>> Jeff Zhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best Regards
> >>>>>
> >>>>> Jeff Zhang
> >>>>>
> >>>>
> >>>
> >>
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>
>

Reply via email to