Hi all,

On the topic of web submission, I agree with Till that it only seems
to complicate things.
It is bad for security, job isolation (anybody can submit/cancel jobs), and its
implementation complicates some parts of the code. So, if it were to
redesign the
WebUI, maybe this part could be left out. In addition, I would say
that the ability to cancel
jobs could also be left out.

Also I would also be in favour of removing the "detached" mode, for
the reasons mentioned
above (i.e. because now we will have a future representing the result
on which the user
can choose to wait or not).

Now for the separating job submission and cluster creation, I am in
favour of keeping both.
Once again, the reasons are mentioned above by Stephan, Till, Aljoscha
and also Zili seems
to agree. They mainly have to do with security, isolation and ease of
resource management
for the user as he knows that "when my job is done, everything will be
cleared up". This is
also the experience you get when launching a process on your local OS.

On excluding the per-job mode from returning a JobClient or not, I
believe that eventually
it would be nice to allow users to get back a jobClient. The reason is
that 1) I cannot
find any objective reason why the user-experience should diverge, and
2) this will be the
way that the user will be able to interact with his running job.
Assuming that the necessary
ports are open for the REST API to work, then I think that the
JobClient can run against the
REST API without problems. If the needed ports are not open, then we
are safe to not return
a JobClient, as the user explicitly chose to close all points of
communication to his running job.

On the topic of not hijacking the "env.execute()" in order to get the
Plan, I definitely agree but
for the proposal of having a "compile()" method in the env, I would
like to have a better look at
the existing code.

Cheers,
Kostas

On Fri, Aug 23, 2019 at 5:52 AM Zili Chen <wander4...@gmail.com> wrote:
>
> Hi Yang,
>
> It would be helpful if you check Stephan's last comment,
> which states that isolation is important.
>
> For per-job mode, we run a dedicated cluster(maybe it
> should have been a couple of JM and TMs during FLIP-6
> design) for a specific job. Thus the process is prevented
> from other jobs.
>
> In our cases there was a time we suffered from multi
> jobs submitted by different users and they affected
> each other so that all ran into an error state. Also,
> run the client inside the cluster could save client
> resource at some points.
>
> However, we also face several issues as you mentioned,
> that in per-job mode it always uses parent classloader
> thus classloading issues occur.
>
> BTW, one can makes an analogy between session/per-job mode
> in  Flink, and client/cluster mode in Spark.
>
> Best,
> tison.
>
>
> Yang Wang <danrtsey...@gmail.com> 于2019年8月22日周四 上午11:25写道:
>
> > From the user's perspective, it is really confused about the scope of
> > per-job cluster.
> >
> >
> > If it means a flink cluster with single job, so that we could get better
> > isolation.
> >
> > Now it does not matter how we deploy the cluster, directly deploy(mode1)
> >
> > or start a flink cluster and then submit job through cluster client(mode2).
> >
> >
> > Otherwise, if it just means directly deploy, how should we name the mode2,
> >
> > session with job or something else?
> >
> > We could also benefit from the mode2. Users could get the same isolation
> > with mode1.
> >
> > The user code and dependencies will be loaded by user class loader
> >
> > to avoid class conflict with framework.
> >
> >
> >
> > Anyway, both of the two submission modes are useful.
> >
> > We just need to clarify the concepts.
> >
> >
> >
> >
> > Best,
> >
> > Yang
> >
> > Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午5:58写道:
> >
> > > Thanks for the clarification.
> > >
> > > The idea JobDeployer ever came into my mind when I was muddled with
> > > how to execute per-job mode and session mode with the same user code
> > > and framework codepath.
> > >
> > > With the concept JobDeployer we back to the statement that environment
> > > knows every configs of cluster deployment and job submission. We
> > > configure or generate from configuration a specific JobDeployer in
> > > environment and then code align on
> > >
> > > *JobClient client = env.execute().get();*
> > >
> > > which in session mode returned by clusterClient.submitJob and in per-job
> > > mode returned by clusterDescriptor.deployJobCluster.
> > >
> > > Here comes a problem that currently we directly run ClusterEntrypoint
> > > with extracted job graph. Follow the JobDeployer way we'd better
> > > align entry point of per-job deployment at JobDeployer. Users run
> > > their main method or by a Cli(finally call main method) to deploy the
> > > job cluster.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Stephan Ewen <se...@apache.org> 于2019年8月20日周二 下午4:40写道:
> > >
> > > > Till has made some good comments here.
> > > >
> > > > Two things to add:
> > > >
> > > >   - The job mode is very nice in the way that it runs the client inside
> > > the
> > > > cluster (in the same image/process that is the JM) and thus unifies
> > both
> > > > applications and what the Spark world calls the "driver mode".
> > > >
> > > >   - Another thing I would add is that during the FLIP-6 design, we were
> > > > thinking about setups where Dispatcher and JobManager are separate
> > > > processes.
> > > >     A Yarn or Mesos Dispatcher of a session could run independently
> > (even
> > > > as privileged processes executing no code).
> > > >     Then you the "per-job" mode could still be helpful: when a job is
> > > > submitted to the dispatcher, it launches the JM again in a per-job
> > mode,
> > > so
> > > > that JM and TM processes are bound to teh job only. For higher security
> > > > setups, it is important that processes are not reused across jobs.
> > > >
> > > > On Tue, Aug 20, 2019 at 10:27 AM Till Rohrmann <trohrm...@apache.org>
> > > > wrote:
> > > >
> > > > > I would not be in favour of getting rid of the per-job mode since it
> > > > > simplifies the process of running Flink jobs considerably. Moreover,
> > it
> > > > is
> > > > > not only well suited for container deployments but also for
> > deployments
> > > > > where you want to guarantee job isolation. For example, a user could
> > > use
> > > > > the per-job mode on Yarn to execute his job on a separate cluster.
> > > > >
> > > > > I think that having two notions of cluster deployments (session vs.
> > > > per-job
> > > > > mode) does not necessarily contradict your ideas for the client api
> > > > > refactoring. For example one could have the following interfaces:
> > > > >
> > > > > - ClusterDeploymentDescriptor: encapsulates the logic how to deploy a
> > > > > cluster.
> > > > > - ClusterClient: allows to interact with a cluster
> > > > > - JobClient: allows to interact with a running job
> > > > >
> > > > > Now the ClusterDeploymentDescriptor could have two methods:
> > > > >
> > > > > - ClusterClient deploySessionCluster()
> > > > > - JobClusterClient/JobClient deployPerJobCluster(JobGraph)
> > > > >
> > > > > where JobClusterClient is either a supertype of ClusterClient which
> > > does
> > > > > not give you the functionality to submit jobs or deployPerJobCluster
> > > > > returns directly a JobClient.
> > > > >
> > > > > When setting up the ExecutionEnvironment, one would then not provide
> > a
> > > > > ClusterClient to submit jobs but a JobDeployer which, depending on
> > the
> > > > > selected mode, either uses a ClusterClient (session mode) to submit
> > > jobs
> > > > or
> > > > > a ClusterDeploymentDescriptor to deploy per a job mode cluster with
> > the
> > > > job
> > > > > to execute.
> > > > >
> > > > > These are just some thoughts how one could make it working because I
> > > > > believe there is some value in using the per job mode from the
> > > > > ExecutionEnvironment.
> > > > >
> > > > > Concerning the web submission, this is indeed a bit tricky. From a
> > > > cluster
> > > > > management stand point, I would in favour of not executing user code
> > on
> > > > the
> > > > > REST endpoint. Especially when considering security, it would be good
> > > to
> > > > > have a well defined cluster behaviour where it is explicitly stated
> > > where
> > > > > user code and, thus, potentially risky code is executed. Ideally we
> > > limit
> > > > > it to the TaskExecutor and JobMaster.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Aug 20, 2019 at 9:40 AM Flavio Pompermaier <
> > > pomperma...@okkam.it
> > > > >
> > > > > wrote:
> > > > >
> > > > > > In my opinion the client should not use any environment to get the
> > > Job
> > > > > > graph because the jar should reside ONLY on the cluster (and not in
> > > the
> > > > > > client classpath otherwise there are always inconsistencies between
> > > > > client
> > > > > > and Flink Job manager's classpath).
> > > > > > In the YARN, Mesos and Kubernetes scenarios you have the jar but
> > you
> > > > > could
> > > > > > start a cluster that has the jar on the Job Manager as well (but
> > this
> > > > is
> > > > > > the only case where I think you can assume that the client has the
> > > jar
> > > > on
> > > > > > the classpath..in the REST job submission you don't have any
> > > > classpath).
> > > > > >
> > > > > > Thus, always in my opinion, the JobGraph should be generated by the
> > > Job
> > > > > > Manager REST API.
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 20, 2019 at 9:00 AM Zili Chen <wander4...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> I would like to involve Till & Stephan here to clarify some
> > concept
> > > of
> > > > > >> per-job mode.
> > > > > >>
> > > > > >> The term per-job is one of modes a cluster could run on. It is
> > > mainly
> > > > > >> aimed
> > > > > >> at spawn
> > > > > >> a dedicated cluster for a specific job while the job could be
> > > packaged
> > > > > >> with
> > > > > >> Flink
> > > > > >> itself and thus the cluster initialized with job so that get rid
> > of
> > > a
> > > > > >> separated
> > > > > >> submission step.
> > > > > >>
> > > > > >> This is useful for container deployments where one create his
> > image
> > > > with
> > > > > >> the job
> > > > > >> and then simply deploy the container.
> > > > > >>
> > > > > >> However, it is out of client scope since a client(ClusterClient
> > for
> > > > > >> example) is for
> > > > > >> communicate with an existing cluster and performance actions.
> > > > Currently,
> > > > > >> in
> > > > > >> per-job
> > > > > >> mode, we extract the job graph and bundle it into cluster
> > deployment
> > > > and
> > > > > >> thus no
> > > > > >> concept of client get involved. It looks like reasonable to
> > exclude
> > > > the
> > > > > >> deployment
> > > > > >> of per-job cluster from client api and use dedicated utility
> > > > > >> classes(deployers) for
> > > > > >> deployment.
> > > > > >>
> > > > > >> Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午12:37写道:
> > > > > >>
> > > > > >> > Hi Aljoscha,
> > > > > >> >
> > > > > >> > Thanks for your reply and participance. The Google Doc you
> > linked
> > > to
> > > > > >> > requires
> > > > > >> > permission and I think you could use a share link instead.
> > > > > >> >
> > > > > >> > I agree with that we almost reach a consensus that JobClient is
> > > > > >> necessary
> > > > > >> > to
> > > > > >> > interacte with a running Job.
> > > > > >> >
> > > > > >> > Let me check your open questions one by one.
> > > > > >> >
> > > > > >> > 1. Separate cluster creation and job submission for per-job
> > mode.
> > > > > >> >
> > > > > >> > As you mentioned here is where the opinions diverge. In my
> > > document
> > > > > >> there
> > > > > >> > is
> > > > > >> > an alternative[2] that proposes excluding per-job deployment
> > from
> > > > > client
> > > > > >> > api
> > > > > >> > scope and now I find it is more reasonable we do the exclusion.
> > > > > >> >
> > > > > >> > When in per-job mode, a dedicated JobCluster is launched to
> > > execute
> > > > > the
> > > > > >> > specific job. It is like a Flink Application more than a
> > > submission
> > > > > >> > of Flink Job. Client only takes care of job submission and
> > assume
> > > > > there
> > > > > >> is
> > > > > >> > an existing cluster. In this way we are able to consider per-job
> > > > > issues
> > > > > >> > individually and JobClusterEntrypoint would be the utility class
> > > for
> > > > > >> > per-job
> > > > > >> > deployment.
> > > > > >> >
> > > > > >> > Nevertheless, user program works in both session mode and
> > per-job
> > > > mode
> > > > > >> > without
> > > > > >> > necessary to change code. JobClient in per-job mode is returned
> > > from
> > > > > >> > env.execute as normal. However, it would be no longer a wrapper
> > of
> > > > > >> > RestClusterClient but a wrapper of PerJobClusterClient which
> > > > > >> communicates
> > > > > >> > to Dispatcher locally.
> > > > > >> >
> > > > > >> > 2. How to deal with plan preview.
> > > > > >> >
> > > > > >> > With env.compile functions users can get JobGraph or FlinkPlan
> > and
> > > > > thus
> > > > > >> > they can preview the plan with programming. Typically it looks
> > > like
> > > > > >> >
> > > > > >> > if (preview configured) {
> > > > > >> >     FlinkPlan plan = env.compile();
> > > > > >> >     new JSONDumpGenerator(...).dump(plan);
> > > > > >> > } else {
> > > > > >> >     env.execute();
> > > > > >> > }
> > > > > >> >
> > > > > >> > And `flink info` would be invalid any more.
> > > > > >> >
> > > > > >> > 3. How to deal with Jar Submission at the Web Frontend.
> > > > > >> >
> > > > > >> > There is one more thread talked on this topic[1]. Apart from
> > > > removing
> > > > > >> > the functions there are two alternatives.
> > > > > >> >
> > > > > >> > One is to introduce an interface has a method returns
> > > > > JobGraph/FilnkPlan
> > > > > >> > and Jar Submission only support main-class implements this
> > > > interface.
> > > > > >> > And then extract the JobGraph/FlinkPlan just by calling the
> > > method.
> > > > > >> > In this way, it is even possible to consider a separation of job
> > > > > >> creation
> > > > > >> > and job submission.
> > > > > >> >
> > > > > >> > The other is, as you mentioned, let execute() do the actual
> > > > execution.
> > > > > >> > We won't execute the main method in the WebFrontend but spawn a
> > > > > process
> > > > > >> > at WebMonitor side to execute. For return part we could generate
> > > the
> > > > > >> > JobID from WebMonitor and pass it to the execution environemnt.
> > > > > >> >
> > > > > >> > 4. How to deal with detached mode.
> > > > > >> >
> > > > > >> > I think detached mode is a temporary solution for non-blocking
> > > > > >> submission.
> > > > > >> > In my document both submission and execution return a
> > > > > CompletableFuture
> > > > > >> and
> > > > > >> > users control whether or not wait for the result. In this point
> > we
> > > > > don't
> > > > > >> > need a detached option but the functionality is covered.
> > > > > >> >
> > > > > >> > 5. How does per-job mode interact with interactive programming.
> > > > > >> >
> > > > > >> > All of YARN, Mesos and Kubernetes scenarios follow the pattern
> > > > launch
> > > > > a
> > > > > >> > JobCluster now. And I don't think there would be inconsistency
> > > > between
> > > > > >> > different resource management.
> > > > > >> >
> > > > > >> > Best,
> > > > > >> > tison.
> > > > > >> >
> > > > > >> > [1]
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> > https://lists.apache.org/x/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
> > > > > >> > [2]
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?disco=AAAADZaGGfs
> > > > > >> >
> > > > > >> > Aljoscha Krettek <aljos...@apache.org> 于2019年8月16日周五 下午9:20写道:
> > > > > >> >
> > > > > >> >> Hi,
> > > > > >> >>
> > > > > >> >> I read both Jeffs initial design document and the newer
> > document
> > > by
> > > > > >> >> Tison. I also finally found the time to collect our thoughts on
> > > the
> > > > > >> issue,
> > > > > >> >> I had quite some discussions with Kostas and this is the
> > result:
> > > > [1].
> > > > > >> >>
> > > > > >> >> I think overall we agree that this part of the code is in dire
> > > need
> > > > > of
> > > > > >> >> some refactoring/improvements but I think there are still some
> > > open
> > > > > >> >> questions and some differences in opinion what those
> > refactorings
> > > > > >> should
> > > > > >> >> look like.
> > > > > >> >>
> > > > > >> >> I think the API-side is quite clear, i.e. we need some
> > JobClient
> > > > API
> > > > > >> that
> > > > > >> >> allows interacting with a running Job. It could be worthwhile
> > to
> > > > spin
> > > > > >> that
> > > > > >> >> off into a separate FLIP because we can probably find consensus
> > > on
> > > > > that
> > > > > >> >> part more easily.
> > > > > >> >>
> > > > > >> >> For the rest, the main open questions from our doc are these:
> > > > > >> >>
> > > > > >> >>   - Do we want to separate cluster creation and job submission
> > > for
> > > > > >> >> per-job mode? In the past, there were conscious efforts to
> > *not*
> > > > > >> separate
> > > > > >> >> job submission from cluster creation for per-job clusters for
> > > > Mesos,
> > > > > >> YARN,
> > > > > >> >> Kubernets (see StandaloneJobClusterEntryPoint). Tison suggests
> > in
> > > > his
> > > > > >> >> design document to decouple this in order to unify job
> > > submission.
> > > > > >> >>
> > > > > >> >>   - How to deal with plan preview, which needs to hijack
> > > execute()
> > > > > and
> > > > > >> >> let the outside code catch an exception?
> > > > > >> >>
> > > > > >> >>   - How to deal with Jar Submission at the Web Frontend, which
> > > > needs
> > > > > to
> > > > > >> >> hijack execute() and let the outside code catch an exception?
> > > > > >> >> CliFrontend.run() “hijacks” ExecutionEnvironment.execute() to
> > > get a
> > > > > >> >> JobGraph and then execute that JobGraph manually. We could get
> > > > around
> > > > > >> that
> > > > > >> >> by letting execute() do the actual execution. One caveat for
> > this
> > > > is
> > > > > >> that
> > > > > >> >> now the main() method doesn’t return (or is forced to return by
> > > > > >> throwing an
> > > > > >> >> exception from execute()) which means that for Jar Submission
> > > from
> > > > > the
> > > > > >> >> WebFrontend we have a long-running main() method running in the
> > > > > >> >> WebFrontend. This doesn’t sound very good. We could get around
> > > this
> > > > > by
> > > > > >> >> removing the plan preview feature and by removing Jar
> > > > > >> Submission/Running.
> > > > > >> >>
> > > > > >> >>   - How to deal with detached mode? Right now,
> > > DetachedEnvironment
> > > > > will
> > > > > >> >> execute the job and return immediately. If users control when
> > > they
> > > > > >> want to
> > > > > >> >> return, by waiting on the job completion future, how do we deal
> > > > with
> > > > > >> this?
> > > > > >> >> Do we simply remove the distinction between
> > > detached/non-detached?
> > > > > >> >>
> > > > > >> >>   - How does per-job mode interact with “interactive
> > programming”
> > > > > >> >> (FLIP-36). For YARN, each execute() call could spawn a new
> > Flink
> > > > YARN
> > > > > >> >> cluster. What about Mesos and Kubernetes?
> > > > > >> >>
> > > > > >> >> The first open question is where the opinions diverge, I think.
> > > The
> > > > > >> rest
> > > > > >> >> are just open questions and interesting things that we need to
> > > > > >> consider.
> > > > > >> >>
> > > > > >> >> Best,
> > > > > >> >> Aljoscha
> > > > > >> >>
> > > > > >> >> [1]
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> > > > > >> >> <
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >> > On 31. Jul 2019, at 15:23, Jeff Zhang <zjf...@gmail.com>
> > > wrote:
> > > > > >> >> >
> > > > > >> >> > Thanks tison for the effort. I left a few comments.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午8:24写道:
> > > > > >> >> >
> > > > > >> >> >> Hi Flavio,
> > > > > >> >> >>
> > > > > >> >> >> Thanks for your reply.
> > > > > >> >> >>
> > > > > >> >> >> Either current impl and in the design, ClusterClient
> > > > > >> >> >> never takes responsibility for generating JobGraph.
> > > > > >> >> >> (what you see in current codebase is several class methods)
> > > > > >> >> >>
> > > > > >> >> >> Instead, user describes his program in the main method
> > > > > >> >> >> with ExecutionEnvironment apis and calls env.compile()
> > > > > >> >> >> or env.optimize() to get FlinkPlan and JobGraph
> > respectively.
> > > > > >> >> >>
> > > > > >> >> >> For listing main classes in a jar and choose one for
> > > > > >> >> >> submission, you're now able to customize a CLI to do it.
> > > > > >> >> >> Specifically, the path of jar is passed as arguments and
> > > > > >> >> >> in the customized CLI you list main classes, choose one
> > > > > >> >> >> to submit to the cluster.
> > > > > >> >> >>
> > > > > >> >> >> Best,
> > > > > >> >> >> tison.
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> Flavio Pompermaier <pomperma...@okkam.it> 于2019年7月31日周三
> > > > 下午8:12写道:
> > > > > >> >> >>
> > > > > >> >> >>> Just one note on my side: it is not clear to me whether the
> > > > > client
> > > > > >> >> needs
> > > > > >> >> >> to
> > > > > >> >> >>> be able to generate a job graph or not.
> > > > > >> >> >>> In my opinion, the job jar must resides only on the
> > > > > >> server/jobManager
> > > > > >> >> >> side
> > > > > >> >> >>> and the client requires a way to get the job graph.
> > > > > >> >> >>> If you really want to access to the job graph, I'd add a
> > > > > dedicated
> > > > > >> >> method
> > > > > >> >> >>> on the ClusterClient. like:
> > > > > >> >> >>>
> > > > > >> >> >>>   - getJobGraph(jarId, mainClass): JobGraph
> > > > > >> >> >>>   - listMainClasses(jarId): List<String>
> > > > > >> >> >>>
> > > > > >> >> >>> These would require some addition also on the job manager
> > > > > endpoint
> > > > > >> as
> > > > > >> >> >>> well..what do you think?
> > > > > >> >> >>>
> > > > > >> >> >>> On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <
> > > > wander4...@gmail.com
> > > > > >
> > > > > >> >> wrote:
> > > > > >> >> >>>
> > > > > >> >> >>>> Hi all,
> > > > > >> >> >>>>
> > > > > >> >> >>>> Here is a document[1] on client api enhancement from our
> > > > > >> perspective.
> > > > > >> >> >>>> We have investigated current implementations. And we
> > propose
> > > > > >> >> >>>>
> > > > > >> >> >>>> 1. Unify the implementation of cluster deployment and job
> > > > > >> submission
> > > > > >> >> in
> > > > > >> >> >>>> Flink.
> > > > > >> >> >>>> 2. Provide programmatic interfaces to allow flexible job
> > and
> > > > > >> cluster
> > > > > >> >> >>>> management.
> > > > > >> >> >>>>
> > > > > >> >> >>>> The first proposal is aimed at reducing code paths of
> > > cluster
> > > > > >> >> >> deployment
> > > > > >> >> >>>> and
> > > > > >> >> >>>> job submission so that one can adopt Flink in his usage
> > > > easily.
> > > > > >> The
> > > > > >> >> >>> second
> > > > > >> >> >>>> proposal is aimed at providing rich interfaces for
> > advanced
> > > > > users
> > > > > >> >> >>>> who want to make accurate control of these stages.
> > > > > >> >> >>>>
> > > > > >> >> >>>> Quick reference on open questions:
> > > > > >> >> >>>>
> > > > > >> >> >>>> 1. Exclude job cluster deployment from client side or
> > > redefine
> > > > > the
> > > > > >> >> >>> semantic
> > > > > >> >> >>>> of job cluster? Since it fits in a process quite different
> > > > from
> > > > > >> >> session
> > > > > >> >> >>>> cluster deployment and job submission.
> > > > > >> >> >>>>
> > > > > >> >> >>>> 2. Maintain the codepaths handling class
> > > > > o.a.f.api.common.Program
> > > > > >> or
> > > > > >> >> >>>> implement customized program handling logic by customized
> > > > > >> >> CliFrontend?
> > > > > >> >> >>>> See also this thread[2] and the document[1].
> > > > > >> >> >>>>
> > > > > >> >> >>>> 3. Expose ClusterClient as public api or just expose api
> > in
> > > > > >> >> >>>> ExecutionEnvironment
> > > > > >> >> >>>> and delegate them to ClusterClient? Further, in either way
> > > is
> > > > it
> > > > > >> >> worth
> > > > > >> >> >> to
> > > > > >> >> >>>> introduce a JobClient which is an encapsulation of
> > > > ClusterClient
> > > > > >> that
> > > > > >> >> >>>> associated to specific job?
> > > > > >> >> >>>>
> > > > > >> >> >>>> Best,
> > > > > >> >> >>>> tison.
> > > > > >> >> >>>>
> > > > > >> >> >>>> [1]
> > > > > >> >> >>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
> > > > > >> >> >>>> [2]
> > > > > >> >> >>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
> > > > > >> >> >>>>
> > > > > >> >> >>>> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三 上午9:19写道:
> > > > > >> >> >>>>
> > > > > >> >> >>>>> Thanks Stephan, I will follow up this issue in next few
> > > > weeks,
> > > > > >> and
> > > > > >> >> >> will
> > > > > >> >> >>>>> refine the design doc. We could discuss more details
> > after
> > > > 1.9
> > > > > >> >> >> release.
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道:
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>> Hi all!
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> This thread has stalled for a bit, which I assume ist
> > > mostly
> > > > > >> due to
> > > > > >> >> >>> the
> > > > > >> >> >>>>>> Flink 1.9 feature freeze and release testing effort.
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> I personally still recognize this issue as one important
> > > to
> > > > be
> > > > > >> >> >>> solved.
> > > > > >> >> >>>>> I'd
> > > > > >> >> >>>>>> be happy to help resume this discussion soon (after the
> > > 1.9
> > > > > >> >> >> release)
> > > > > >> >> >>>> and
> > > > > >> >> >>>>>> see if we can do some step towards this in Flink 1.10.
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> Best,
> > > > > >> >> >>>>>> Stephan
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier <
> > > > > >> >> >>>>> pomperma...@okkam.it>
> > > > > >> >> >>>>>> wrote:
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>> That's exactly what I suggested a long time ago: the
> > > Flink
> > > > > REST
> > > > > >> >> >>>> client
> > > > > >> >> >>>>>>> should not require any Flink dependency, only http
> > > library
> > > > to
> > > > > >> >> >> call
> > > > > >> >> >>>> the
> > > > > >> >> >>>>>> REST
> > > > > >> >> >>>>>>> services to submit and monitor a job.
> > > > > >> >> >>>>>>> What I suggested also in [1] was to have a way to
> > > > > automatically
> > > > > >> >> >>>> suggest
> > > > > >> >> >>>>>> the
> > > > > >> >> >>>>>>> user (via a UI) the available main classes and their
> > > > required
> > > > > >> >> >>>>>>> parameters[2].
> > > > > >> >> >>>>>>> Another problem we have with Flink is that the Rest
> > > client
> > > > > and
> > > > > >> >> >> the
> > > > > >> >> >>>> CLI
> > > > > >> >> >>>>>> one
> > > > > >> >> >>>>>>> behaves differently and we use the CLI client (via ssh)
> > > > > because
> > > > > >> >> >> it
> > > > > >> >> >>>>> allows
> > > > > >> >> >>>>>>> to call some other method after env.execute() [3] (we
> > > have
> > > > to
> > > > > >> >> >> call
> > > > > >> >> >>>>>> another
> > > > > >> >> >>>>>>> REST service to signal the end of the job).
> > > > > >> >> >>>>>>> Int his regard, a dedicated interface, like the
> > > JobListener
> > > > > >> >> >>> suggested
> > > > > >> >> >>>>> in
> > > > > >> >> >>>>>>> the previous emails, would be very helpful (IMHO).
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
> > > > > >> >> >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-10862
> > > > > >> >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-10879
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> Best,
> > > > > >> >> >>>>>>> Flavio
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <
> > > > zjf...@gmail.com
> > > > > >
> > > > > >> >> >>> wrote:
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>>> Hi, Tison,
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Thanks for your comments. Overall I agree with you
> > that
> > > it
> > > > > is
> > > > > >> >> >>>>> difficult
> > > > > >> >> >>>>>>> for
> > > > > >> >> >>>>>>>> down stream project to integrate with flink and we
> > need
> > > to
> > > > > >> >> >>> refactor
> > > > > >> >> >>>>> the
> > > > > >> >> >>>>>>>> current flink client api.
> > > > > >> >> >>>>>>>> And I agree that CliFrontend should only parsing
> > command
> > > > > line
> > > > > >> >> >>>>> arguments
> > > > > >> >> >>>>>>> and
> > > > > >> >> >>>>>>>> then pass them to ExecutionEnvironment. It is
> > > > > >> >> >>>> ExecutionEnvironment's
> > > > > >> >> >>>>>>>> responsibility to compile job, create cluster, and
> > > submit
> > > > > job.
> > > > > >> >> >>>>> Besides
> > > > > >> >> >>>>>>>> that, Currently flink has many ExecutionEnvironment
> > > > > >> >> >>>> implementations,
> > > > > >> >> >>>>>> and
> > > > > >> >> >>>>>>>> flink will use the specific one based on the context.
> > > > IMHO,
> > > > > it
> > > > > >> >> >> is
> > > > > >> >> >>>> not
> > > > > >> >> >>>>>>>> necessary, ExecutionEnvironment should be able to do
> > the
> > > > > right
> > > > > >> >> >>>> thing
> > > > > >> >> >>>>>>> based
> > > > > >> >> >>>>>>>> on the FlinkConf it is received. Too many
> > > > > ExecutionEnvironment
> > > > > >> >> >>>>>>>> implementation is another burden for downstream
> > project
> > > > > >> >> >>>> integration.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> One thing I'd like to mention is flink's scala shell
> > and
> > > > sql
> > > > > >> >> >>>> client,
> > > > > >> >> >>>>>>>> although they are sub-modules of flink, they could be
> > > > > treated
> > > > > >> >> >> as
> > > > > >> >> >>>>>>> downstream
> > > > > >> >> >>>>>>>> project which use flink's client api. Currently you
> > will
> > > > > find
> > > > > >> >> >> it
> > > > > >> >> >>> is
> > > > > >> >> >>>>> not
> > > > > >> >> >>>>>>>> easy for them to integrate with flink, they share many
> > > > > >> >> >> duplicated
> > > > > >> >> >>>>> code.
> > > > > >> >> >>>>>>> It
> > > > > >> >> >>>>>>>> is another sign that we should refactor flink client
> > > api.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> I believe it is a large and hard change, and I am
> > afraid
> > > > we
> > > > > >> can
> > > > > >> >> >>> not
> > > > > >> >> >>>>>> keep
> > > > > >> >> >>>>>>>> compatibility since many of changes are user facing.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年6月24日周一
> > > 下午2:53写道:
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>> Hi all,
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> After a closer look on our client apis, I can see
> > there
> > > > are
> > > > > >> >> >> two
> > > > > >> >> >>>>> major
> > > > > >> >> >>>>>>>>> issues to consistency and integration, namely
> > different
> > > > > >> >> >>>> deployment
> > > > > >> >> >>>>> of
> > > > > >> >> >>>>>>>>> job cluster which couples job graph creation and
> > > cluster
> > > > > >> >> >>>>> deployment,
> > > > > >> >> >>>>>>>>> and submission via CliFrontend confusing control flow
> > > of
> > > > > job
> > > > > >> >> >>>> graph
> > > > > >> >> >>>>>>>>> compilation and job submission. I'd like to follow
> > the
> > > > > >> >> >> discuss
> > > > > >> >> >>>>> above,
> > > > > >> >> >>>>>>>>> mainly the process described by Jeff and Stephan, and
> > > > share
> > > > > >> >> >> my
> > > > > >> >> >>>>>>>>> ideas on these issues.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> 1) CliFrontend confuses the control flow of job
> > > > compilation
> > > > > >> >> >> and
> > > > > >> >> >>>>>>>> submission.
> > > > > >> >> >>>>>>>>> Following the process of job submission Stephan and
> > > Jeff
> > > > > >> >> >>>> described,
> > > > > >> >> >>>>>>>>> execution environment knows all configs of the
> > cluster
> > > > and
> > > > > >> >> >>>>>>> topos/settings
> > > > > >> >> >>>>>>>>> of the job. Ideally, in the main method of user
> > > program,
> > > > it
> > > > > >> >> >>> calls
> > > > > >> >> >>>>>>>> #execute
> > > > > >> >> >>>>>>>>> (or named #submit) and Flink deploys the cluster,
> > > compile
> > > > > the
> > > > > >> >> >>> job
> > > > > >> >> >>>>>> graph
> > > > > >> >> >>>>>>>>> and submit it to the cluster. However, current
> > > > CliFrontend
> > > > > >> >> >> does
> > > > > >> >> >>>> all
> > > > > >> >> >>>>>>> these
> > > > > >> >> >>>>>>>>> things inside its #runProgram method, which
> > introduces
> > > a
> > > > > lot
> > > > > >> >> >> of
> > > > > >> >> >>>>>>>> subclasses
> > > > > >> >> >>>>>>>>> of (stream) execution environment.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> Actually, it sets up an exec env that hijacks the
> > > > > >> >> >>>>>> #execute/executePlan
> > > > > >> >> >>>>>>>>> method, initializes the job graph and abort
> > execution.
> > > > And
> > > > > >> >> >> then
> > > > > >> >> >>>>>>>>> control flow back to CliFrontend, it deploys the
> > > > cluster(or
> > > > > >> >> >>>>> retrieve
> > > > > >> >> >>>>>>>>> the client) and submits the job graph. This is quite
> > a
> > > > > >> >> >> specific
> > > > > >> >> >>>>>>> internal
> > > > > >> >> >>>>>>>>> process inside Flink and none of consistency to
> > > anything.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> 2) Deployment of job cluster couples job graph
> > creation
> > > > and
> > > > > >> >> >>>> cluster
> > > > > >> >> >>>>>>>>> deployment. Abstractly, from user job to a concrete
> > > > > >> >> >> submission,
> > > > > >> >> >>>> it
> > > > > >> >> >>>>>>>> requires
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>>     create JobGraph --\
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> create ClusterClient -->  submit JobGraph
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> such a dependency. ClusterClient was created by
> > > deploying
> > > > > or
> > > > > >> >> >>>>>>> retrieving.
> > > > > >> >> >>>>>>>>> JobGraph submission requires a compiled JobGraph and
> > > > valid
> > > > > >> >> >>>>>>> ClusterClient,
> > > > > >> >> >>>>>>>>> but the creation of ClusterClient is abstractly
> > > > independent
> > > > > >> >> >> of
> > > > > >> >> >>>> that
> > > > > >> >> >>>>>> of
> > > > > >> >> >>>>>>>>> JobGraph. However, in job cluster mode, we deploy job
> > > > > cluster
> > > > > >> >> >>>> with
> > > > > >> >> >>>>> a
> > > > > >> >> >>>>>>> job
> > > > > >> >> >>>>>>>>> graph, which means we use another process:
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> create JobGraph --> deploy cluster with the JobGraph
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> Here is another inconsistency and downstream
> > > > > projects/client
> > > > > >> >> >>> apis
> > > > > >> >> >>>>> are
> > > > > >> >> >>>>>>>>> forced to handle different cases with rare supports
> > > from
> > > > > >> >> >> Flink.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> Since we likely reached a consensus on
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> 1. all configs gathered by Flink configuration and
> > > passed
> > > > > >> >> >>>>>>>>> 2. execution environment knows all configs and
> > handles
> > > > > >> >> >>>>> execution(both
> > > > > >> >> >>>>>>>>> deployment and submission)
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> to the issues above I propose eliminating
> > > inconsistencies
> > > > > by
> > > > > >> >> >>>>>> following
> > > > > >> >> >>>>>>>>> approach:
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> 1) CliFrontend should exactly be a front end, at
> > least
> > > > for
> > > > > >> >> >>> "run"
> > > > > >> >> >>>>>>> command.
> > > > > >> >> >>>>>>>>> That means it just gathered and passed all config
> > from
> > > > > >> >> >> command
> > > > > >> >> >>>> line
> > > > > >> >> >>>>>> to
> > > > > >> >> >>>>>>>>> the main method of user program. Execution
> > environment
> > > > > knows
> > > > > >> >> >>> all
> > > > > >> >> >>>>> the
> > > > > >> >> >>>>>>> info
> > > > > >> >> >>>>>>>>> and with an addition to utils for ClusterClient, we
> > > > > >> >> >> gracefully
> > > > > >> >> >>>> get
> > > > > >> >> >>>>> a
> > > > > >> >> >>>>>>>>> ClusterClient by deploying or retrieving. In this
> > way,
> > > we
> > > > > >> >> >> don't
> > > > > >> >> >>>>> need
> > > > > >> >> >>>>>> to
> > > > > >> >> >>>>>>>>> hijack #execute/executePlan methods and can remove
> > > > various
> > > > > >> >> >>>> hacking
> > > > > >> >> >>>>>>>>> subclasses of exec env, as well as #run methods in
> > > > > >> >> >>>>> ClusterClient(for
> > > > > >> >> >>>>>> an
> > > > > >> >> >>>>>>>>> interface-ized ClusterClient). Now the control flow
> > > flows
> > > > > >> >> >> from
> > > > > >> >> >>>>>>>> CliFrontend
> > > > > >> >> >>>>>>>>> to the main method and never returns.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> 2) Job cluster means a cluster for the specific job.
> > > From
> > > > > >> >> >>> another
> > > > > >> >> >>>>>>>>> perspective, it is an ephemeral session. We may
> > > decouple
> > > > > the
> > > > > >> >> >>>>>> deployment
> > > > > >> >> >>>>>>>>> with a compiled job graph, but start a session with
> > > idle
> > > > > >> >> >>> timeout
> > > > > >> >> >>>>>>>>> and submit the job following.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> These topics, before we go into more details on
> > design
> > > or
> > > > > >> >> >>>>>>> implementation,
> > > > > >> >> >>>>>>>>> are better to be aware and discussed for a consensus.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> Best,
> > > > > >> >> >>>>>>>>> tison.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年6月20日周四
> > > 上午3:21写道:
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>>> Hi Jeff,
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>> Thanks for raising this thread and the design
> > > document!
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>> As @Thomas Weise mentioned above, extending config
> > to
> > > > > flink
> > > > > >> >> >>>>>>>>>> requires far more effort than it should be. Another
> > > > > example
> > > > > >> >> >>>>>>>>>> is we achieve detach mode by introduce another
> > > execution
> > > > > >> >> >>>>>>>>>> environment which also hijack #execute method.
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>> I agree with your idea that user would configure all
> > > > > things
> > > > > >> >> >>>>>>>>>> and flink "just" respect it. On this topic I think
> > the
> > > > > >> >> >> unusual
> > > > > >> >> >>>>>>>>>> control flow when CliFrontend handle "run" command
> > is
> > > > the
> > > > > >> >> >>>> problem.
> > > > > >> >> >>>>>>>>>> It handles several configs, mainly about cluster
> > > > settings,
> > > > > >> >> >> and
> > > > > >> >> >>>>>>>>>> thus main method of user program is unaware of them.
> > > > Also
> > > > > it
> > > > > >> >> >>>>>> compiles
> > > > > >> >> >>>>>>>>>> app to job graph by run the main method with a
> > > hijacked
> > > > > exec
> > > > > >> >> >>>> env,
> > > > > >> >> >>>>>>>>>> which constrain the main method further.
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>> I'd like to write down a few of notes on
> > configs/args
> > > > pass
> > > > > >> >> >> and
> > > > > >> >> >>>>>>> respect,
> > > > > >> >> >>>>>>>>>> as well as decoupling job compilation and
> > submission.
> > > > > Share
> > > > > >> >> >> on
> > > > > >> >> >>>>> this
> > > > > >> >> >>>>>>>>>> thread later.
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>> Best,
> > > > > >> >> >>>>>>>>>> tison.
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一
> > > > > >> >> >> 下午7:29写道:
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>>>> Hi Jeff and Flavio,
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>> Thanks Jeff a lot for proposing the design
> > document.
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>> We are also working on refactoring ClusterClient to
> > > > allow
> > > > > >> >> >>>>> flexible
> > > > > >> >> >>>>>>> and
> > > > > >> >> >>>>>>>>>>> efficient job management in our real-time platform.
> > > > > >> >> >>>>>>>>>>> We would like to draft a document to share our
> > ideas
> > > > with
> > > > > >> >> >>> you.
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>> I think it's a good idea to have something like
> > > Apache
> > > > > Livy
> > > > > >> >> >>> for
> > > > > >> >> >>>>>>> Flink,
> > > > > >> >> >>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>> the efforts discussed here will take a great step
> > > > forward
> > > > > >> >> >> to
> > > > > >> >> >>>> it.
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>> Regards,
> > > > > >> >> >>>>>>>>>>> Xiaogang
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it>
> > > > 于2019年6月17日周一
> > > > > >> >> >>>>> 下午7:13写道:
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>> Is there any possibility to have something like
> > > Apache
> > > > > >> >> >> Livy
> > > > > >> >> >>>> [1]
> > > > > >> >> >>>>>>> also
> > > > > >> >> >>>>>>>>>>> for
> > > > > >> >> >>>>>>>>>>>> Flink in the future?
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>> [1] https://livy.apache.org/
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>> On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <
> > > > > >> >> >>> zjf...@gmail.com
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>>> wrote:
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> Any API we expose should not have dependencies
> > > on
> > > > > >> >> >>> the
> > > > > >> >> >>>>>>> runtime
> > > > > >> >> >>>>>>>>>>>>> (flink-runtime) package or other implementation
> > > > > >> >> >> details.
> > > > > >> >> >>> To
> > > > > >> >> >>>>> me,
> > > > > >> >> >>>>>>>> this
> > > > > >> >> >>>>>>>>>>>> means
> > > > > >> >> >>>>>>>>>>>>> that the current ClusterClient cannot be exposed
> > to
> > > > > >> >> >> users
> > > > > >> >> >>>>>> because
> > > > > >> >> >>>>>>>> it
> > > > > >> >> >>>>>>>>>>>> uses
> > > > > >> >> >>>>>>>>>>>>> quite some classes from the optimiser and runtime
> > > > > >> >> >>> packages.
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>> We should change ClusterClient from class to
> > > > interface.
> > > > > >> >> >>>>>>>>>>>>> ExecutionEnvironment only use the interface
> > > > > >> >> >> ClusterClient
> > > > > >> >> >>>>> which
> > > > > >> >> >>>>>>>>>>> should be
> > > > > >> >> >>>>>>>>>>>>> in flink-clients while the concrete
> > implementation
> > > > > >> >> >> class
> > > > > >> >> >>>>> could
> > > > > >> >> >>>>>> be
> > > > > >> >> >>>>>>>> in
> > > > > >> >> >>>>>>>>>>>>> flink-runtime.
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> What happens when a failure/restart in the
> > > client
> > > > > >> >> >>>>> happens?
> > > > > >> >> >>>>>>>> There
> > > > > >> >> >>>>>>>>>>> need
> > > > > >> >> >>>>>>>>>>>>> to be a way of re-establishing the connection to
> > > the
> > > > > >> >> >> job,
> > > > > >> >> >>>> set
> > > > > >> >> >>>>>> up
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>> listeners again, etc.
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>> Good point.  First we need to define what does
> > > > > >> >> >>>>> failure/restart
> > > > > >> >> >>>>>> in
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>> client mean. IIUC, that usually mean network
> > > failure
> > > > > >> >> >>> which
> > > > > >> >> >>>>> will
> > > > > >> >> >>>>>>>>>>> happen in
> > > > > >> >> >>>>>>>>>>>>> class RestClient. If my understanding is correct,
> > > > > >> >> >>>>> restart/retry
> > > > > >> >> >>>>>>>>>>> mechanism
> > > > > >> >> >>>>>>>>>>>>> should be done in RestClient.
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org>
> > > 于2019年6月11日周二
> > > > > >> >> >>>>>> 下午11:10写道:
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>> Some points to consider:
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>> * Any API we expose should not have dependencies
> > > on
> > > > > >> >> >> the
> > > > > >> >> >>>>>> runtime
> > > > > >> >> >>>>>>>>>>>>>> (flink-runtime) package or other implementation
> > > > > >> >> >>> details.
> > > > > >> >> >>>> To
> > > > > >> >> >>>>>> me,
> > > > > >> >> >>>>>>>>>>> this
> > > > > >> >> >>>>>>>>>>>>> means
> > > > > >> >> >>>>>>>>>>>>>> that the current ClusterClient cannot be exposed
> > > to
> > > > > >> >> >>> users
> > > > > >> >> >>>>>>> because
> > > > > >> >> >>>>>>>>>>> it
> > > > > >> >> >>>>>>>>>>>>> uses
> > > > > >> >> >>>>>>>>>>>>>> quite some classes from the optimiser and
> > runtime
> > > > > >> >> >>>> packages.
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>> * What happens when a failure/restart in the
> > > client
> > > > > >> >> >>>>> happens?
> > > > > >> >> >>>>>>>> There
> > > > > >> >> >>>>>>>>>>> need
> > > > > >> >> >>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>> be a way of re-establishing the connection to
> > the
> > > > > >> >> >> job,
> > > > > >> >> >>>> set
> > > > > >> >> >>>>> up
> > > > > >> >> >>>>>>> the
> > > > > >> >> >>>>>>>>>>>>> listeners
> > > > > >> >> >>>>>>>>>>>>>> again, etc.
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>> Aljoscha
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>> On 29. May 2019, at 10:17, Jeff Zhang <
> > > > > >> >> >>>> zjf...@gmail.com>
> > > > > >> >> >>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>> Sorry folks, the design doc is late as you
> > > > > >> >> >> expected.
> > > > > >> >> >>>>> Here's
> > > > > >> >> >>>>>>> the
> > > > > >> >> >>>>>>>>>>>> design
> > > > > >> >> >>>>>>>>>>>>>> doc
> > > > > >> >> >>>>>>>>>>>>>>> I drafted, welcome any comments and feedback.
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>> Stephan Ewen <se...@apache.org> 于2019年2月14日周四
> > > > > >> >> >>>> 下午8:43写道:
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> Nice that this discussion is happening.
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> In the FLIP, we could also revisit the entire
> > > role
> > > > > >> >> >>> of
> > > > > >> >> >>>>> the
> > > > > >> >> >>>>>>>>>>>> environments
> > > > > >> >> >>>>>>>>>>>>>>>> again.
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> Initially, the idea was:
> > > > > >> >> >>>>>>>>>>>>>>>> - the environments take care of the specific
> > > > > >> >> >> setup
> > > > > >> >> >>>> for
> > > > > >> >> >>>>>>>>>>> standalone
> > > > > >> >> >>>>>>>>>>>> (no
> > > > > >> >> >>>>>>>>>>>>>>>> setup needed), yarn, mesos, etc.
> > > > > >> >> >>>>>>>>>>>>>>>> - the session ones have control over the
> > > session.
> > > > > >> >> >>> The
> > > > > >> >> >>>>>>>>>>> environment
> > > > > >> >> >>>>>>>>>>>>> holds
> > > > > >> >> >>>>>>>>>>>>>>>> the session client.
> > > > > >> >> >>>>>>>>>>>>>>>> - running a job gives a "control" object for
> > > that
> > > > > >> >> >>>> job.
> > > > > >> >> >>>>>> That
> > > > > >> >> >>>>>>>>>>>> behavior
> > > > > >> >> >>>>>>>>>>>>> is
> > > > > >> >> >>>>>>>>>>>>>>>> the same in all environments.
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> The actual implementation diverged quite a bit
> > > > > >> >> >> from
> > > > > >> >> >>>>> that.
> > > > > >> >> >>>>>>>> Happy
> > > > > >> >> >>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>> see a
> > > > > >> >> >>>>>>>>>>>>>>>> discussion about straitening this out a bit
> > > more.
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <
> > > > > >> >> >>>>>>> zjf...@gmail.com>
> > > > > >> >> >>>>>>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>> Hi folks,
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>> Sorry for late response, It seems we reach
> > > > > >> >> >>> consensus
> > > > > >> >> >>>> on
> > > > > >> >> >>>>>>>> this, I
> > > > > >> >> >>>>>>>>>>>> will
> > > > > >> >> >>>>>>>>>>>>>>>> create
> > > > > >> >> >>>>>>>>>>>>>>>>> FLIP for this with more detailed design
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>> Thomas Weise <t...@apache.org> 于2018年12月21日周五
> > > > > >> >> >>>>> 上午11:43写道:
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> Great to see this discussion seeded! The
> > > > > >> >> >> problems
> > > > > >> >> >>>> you
> > > > > >> >> >>>>>> face
> > > > > >> >> >>>>>>>>>>> with
> > > > > >> >> >>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>> Zeppelin integration are also affecting
> > other
> > > > > >> >> >>>>> downstream
> > > > > >> >> >>>>>>>>>>> projects,
> > > > > >> >> >>>>>>>>>>>>>> like
> > > > > >> >> >>>>>>>>>>>>>>>>>> Beam.
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> We just enabled the savepoint restore option
> > > in
> > > > > >> >> >>>>>>>>>>>>>> RemoteStreamEnvironment
> > > > > >> >> >>>>>>>>>>>>>>>>> [1]
> > > > > >> >> >>>>>>>>>>>>>>>>>> and that was more difficult than it should
> > be.
> > > > > >> >> >> The
> > > > > >> >> >>>>> main
> > > > > >> >> >>>>>>>> issue
> > > > > >> >> >>>>>>>>>>> is
> > > > > >> >> >>>>>>>>>>>>> that
> > > > > >> >> >>>>>>>>>>>>>>>>>> environment and cluster client aren't
> > > decoupled.
> > > > > >> >> >>>>> Ideally
> > > > > >> >> >>>>>>> it
> > > > > >> >> >>>>>>>>>>> should
> > > > > >> >> >>>>>>>>>>>>> be
> > > > > >> >> >>>>>>>>>>>>>>>>>> possible to just get the matching cluster
> > > client
> > > > > >> >> >>>> from
> > > > > >> >> >>>>>> the
> > > > > >> >> >>>>>>>>>>>>> environment
> > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>> then control the job through it (environment
> > > as
> > > > > >> >> >>>>> factory
> > > > > >> >> >>>>>>> for
> > > > > >> >> >>>>>>>>>>>> cluster
> > > > > >> >> >>>>>>>>>>>>>>>>>> client). But note that the environment
> > classes
> > > > > >> >> >> are
> > > > > >> >> >>>>> part
> > > > > >> >> >>>>>> of
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>> public
> > > > > >> >> >>>>>>>>>>>>>>>>> API,
> > > > > >> >> >>>>>>>>>>>>>>>>>> and it is not straightforward to make larger
> > > > > >> >> >>> changes
> > > > > >> >> >>>>>>> without
> > > > > >> >> >>>>>>>>>>>>> breaking
> > > > > >> >> >>>>>>>>>>>>>>>>>> backward compatibility.
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterClient currently exposes internal
> > > classes
> > > > > >> >> >>>> like
> > > > > >> >> >>>>>>>>>>> JobGraph and
> > > > > >> >> >>>>>>>>>>>>>>>>>> StreamGraph. But it should be possible to
> > wrap
> > > > > >> >> >>> this
> > > > > >> >> >>>>>> with a
> > > > > >> >> >>>>>>>> new
> > > > > >> >> >>>>>>>>>>>>> public
> > > > > >> >> >>>>>>>>>>>>>>>> API
> > > > > >> >> >>>>>>>>>>>>>>>>>> that brings the required job control
> > > > > >> >> >> capabilities
> > > > > >> >> >>>> for
> > > > > >> >> >>>>>>>>>>> downstream
> > > > > >> >> >>>>>>>>>>>>>>>>> projects.
> > > > > >> >> >>>>>>>>>>>>>>>>>> Perhaps it is helpful to look at some of the
> > > > > >> >> >>>>> interfaces
> > > > > >> >> >>>>>> in
> > > > > >> >> >>>>>>>>>>> Beam
> > > > > >> >> >>>>>>>>>>>>> while
> > > > > >> >> >>>>>>>>>>>>>>>>>> thinking about this: [2] for the portable
> > job
> > > > > >> >> >> API
> > > > > >> >> >>>> and
> > > > > >> >> >>>>>> [3]
> > > > > >> >> >>>>>>>> for
> > > > > >> >> >>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>> old
> > > > > >> >> >>>>>>>>>>>>>>>>>> asynchronous job control from the Beam Java
> > > SDK.
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> The backward compatibility discussion [4] is
> > > > > >> >> >> also
> > > > > >> >> >>>>>> relevant
> > > > > >> >> >>>>>>>>>>> here. A
> > > > > >> >> >>>>>>>>>>>>> new
> > > > > >> >> >>>>>>>>>>>>>>>>> API
> > > > > >> >> >>>>>>>>>>>>>>>>>> should shield downstream projects from
> > > internals
> > > > > >> >> >>> and
> > > > > >> >> >>>>>> allow
> > > > > >> >> >>>>>>>>>>> them to
> > > > > >> >> >>>>>>>>>>>>>>>>>> interoperate with multiple future Flink
> > > versions
> > > > > >> >> >>> in
> > > > > >> >> >>>>> the
> > > > > >> >> >>>>>>> same
> > > > > >> >> >>>>>>>>>>>> release
> > > > > >> >> >>>>>>>>>>>>>>>> line
> > > > > >> >> >>>>>>>>>>>>>>>>>> without forced upgrades.
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> Thanks,
> > > > > >> >> >>>>>>>>>>>>>>>>>> Thomas
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> [1]
> > https://github.com/apache/flink/pull/7249
> > > > > >> >> >>>>>>>>>>>>>>>>>> [2]
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > > > >> >> >>>>>>>>>>>>>>>>>> [3]
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > > > >> >> >>>>>>>>>>>>>>>>>> [4]
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <
> > > > > >> >> >>>>>>>> zjf...@gmail.com>
> > > > > >> >> >>>>>>>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'm not so sure whether the user should
> > be
> > > > > >> >> >>> able
> > > > > >> >> >>>> to
> > > > > >> >> >>>>>>>> define
> > > > > >> >> >>>>>>>>>>>> where
> > > > > >> >> >>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>> runs (in your example Yarn). This is
> > actually
> > > > > >> >> >>>>>> independent
> > > > > >> >> >>>>>>>> of
> > > > > >> >> >>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>> development and is something which is
> > decided
> > > > > >> >> >> at
> > > > > >> >> >>>>>>> deployment
> > > > > >> >> >>>>>>>>>>> time.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> User don't need to specify execution mode
> > > > > >> >> >>>>>>> programmatically.
> > > > > >> >> >>>>>>>>>>> They
> > > > > >> >> >>>>>>>>>>>>> can
> > > > > >> >> >>>>>>>>>>>>>>>>> also
> > > > > >> >> >>>>>>>>>>>>>>>>>>> pass the execution mode from the arguments
> > in
> > > > > >> >> >>> flink
> > > > > >> >> >>>>> run
> > > > > >> >> >>>>>>>>>>> command.
> > > > > >> >> >>>>>>>>>>>>> e.g.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m yarn-cluster ....
> > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m local ...
> > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m host:port ...
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Does this make sense to you ?
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> To me it makes sense that the
> > > > > >> >> >>>> ExecutionEnvironment
> > > > > >> >> >>>>>> is
> > > > > >> >> >>>>>>>> not
> > > > > >> >> >>>>>>>>>>>>>>>> directly
> > > > > >> >> >>>>>>>>>>>>>>>>>>> initialized by the user and instead context
> > > > > >> >> >>>> sensitive
> > > > > >> >> >>>>>> how
> > > > > >> >> >>>>>>>> you
> > > > > >> >> >>>>>>>>>>>> want
> > > > > >> >> >>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>> execute your job (Flink CLI vs. IDE, for
> > > > > >> >> >>> example).
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Right, currently I notice Flink would
> > create
> > > > > >> >> >>>>> different
> > > > > >> >> >>>>>>>>>>>>>>>>>>> ContextExecutionEnvironment based on
> > > different
> > > > > >> >> >>>>>> submission
> > > > > >> >> >>>>>>>>>>>> scenarios
> > > > > >> >> >>>>>>>>>>>>>>>>>> (Flink
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Cli vs IDE). To me this is kind of hack
> > > > > >> >> >> approach,
> > > > > >> >> >>>> not
> > > > > >> >> >>>>>> so
> > > > > >> >> >>>>>>>>>>>>>>>>> straightforward.
> > > > > >> >> >>>>>>>>>>>>>>>>>>> What I suggested above is that is that
> > flink
> > > > > >> >> >>> should
> > > > > >> >> >>>>>>> always
> > > > > >> >> >>>>>>>>>>> create
> > > > > >> >> >>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>> same
> > > > > >> >> >>>>>>>>>>>>>>>>>>> ExecutionEnvironment but with different
> > > > > >> >> >>>>> configuration,
> > > > > >> >> >>>>>>> and
> > > > > >> >> >>>>>>>>>>> based
> > > > > >> >> >>>>>>>>>>>> on
> > > > > >> >> >>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>> configuration it would create the proper
> > > > > >> >> >>>>> ClusterClient
> > > > > >> >> >>>>>>> for
> > > > > >> >> >>>>>>>>>>>>> different
> > > > > >> >> >>>>>>>>>>>>>>>>>>> behaviors.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Till Rohrmann <trohrm...@apache.org>
> > > > > >> >> >>>> 于2018年12月20日周四
> > > > > >> >> >>>>>>>>>>> 下午11:18写道:
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> You are probably right that we have code
> > > > > >> >> >>>> duplication
> > > > > >> >> >>>>>>> when
> > > > > >> >> >>>>>>>> it
> > > > > >> >> >>>>>>>>>>>> comes
> > > > > >> >> >>>>>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> creation of the ClusterClient. This should
> > > be
> > > > > >> >> >>>>> reduced
> > > > > >> >> >>>>>> in
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>> future.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> I'm not so sure whether the user should be
> > > > > >> >> >> able
> > > > > >> >> >>> to
> > > > > >> >> >>>>>>> define
> > > > > >> >> >>>>>>>>>>> where
> > > > > >> >> >>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> runs (in your example Yarn). This is
> > > actually
> > > > > >> >> >>>>>>> independent
> > > > > >> >> >>>>>>>>>>> of the
> > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> development and is something which is
> > > decided
> > > > > >> >> >> at
> > > > > >> >> >>>>>>>> deployment
> > > > > >> >> >>>>>>>>>>>> time.
> > > > > >> >> >>>>>>>>>>>>>>>> To
> > > > > >> >> >>>>>>>>>>>>>>>>> me
> > > > > >> >> >>>>>>>>>>>>>>>>>>> it
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> makes sense that the ExecutionEnvironment
> > is
> > > > > >> >> >> not
> > > > > >> >> >>>>>>> directly
> > > > > >> >> >>>>>>>>>>>>>>>> initialized
> > > > > >> >> >>>>>>>>>>>>>>>>>> by
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> the user and instead context sensitive how
> > > you
> > > > > >> >> >>>> want
> > > > > >> >> >>>>> to
> > > > > >> >> >>>>>>>>>>> execute
> > > > > >> >> >>>>>>>>>>>>> your
> > > > > >> >> >>>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> (Flink CLI vs. IDE, for example).
> > However, I
> > > > > >> >> >>> agree
> > > > > >> >> >>>>>> that
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> ExecutionEnvironment should give you
> > access
> > > to
> > > > > >> >> >>> the
> > > > > >> >> >>>>>>>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> job (maybe in the form of the JobGraph or
> > a
> > > > > >> >> >> job
> > > > > >> >> >>>>> plan).
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> Cheers,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> Till
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff
> > Zhang <
> > > > > >> >> >>>>>>>>>>> zjf...@gmail.com>
> > > > > >> >> >>>>>>>>>>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi Till,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. You are right
> > > that I
> > > > > >> >> >>>>> expect
> > > > > >> >> >>>>>>>> better
> > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job submission/control api which could be
> > > > > >> >> >> used
> > > > > >> >> >>> by
> > > > > >> >> >>>>>>>>>>> downstream
> > > > > >> >> >>>>>>>>>>>>>>>>> project.
> > > > > >> >> >>>>>>>>>>>>>>>>>>> And
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it would benefit for the flink ecosystem.
> > > > > >> >> >> When
> > > > > >> >> >>> I
> > > > > >> >> >>>>> look
> > > > > >> >> >>>>>>> at
> > > > > >> >> >>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>> code
> > > > > >> >> >>>>>>>>>>>>>>>>> of
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> flink
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> scala-shell and sql-client (I believe
> > they
> > > > > >> >> >> are
> > > > > >> >> >>>> not
> > > > > >> >> >>>>>> the
> > > > > >> >> >>>>>>>>>>> core of
> > > > > >> >> >>>>>>>>>>>>>>>>> flink,
> > > > > >> >> >>>>>>>>>>>>>>>>>>> but
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> belong to the ecosystem of flink), I find
> > > > > >> >> >> many
> > > > > >> >> >>>>>>> duplicated
> > > > > >> >> >>>>>>>>>>> code
> > > > > >> >> >>>>>>>>>>>>>>>> for
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> creating
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ClusterClient from user provided
> > > > > >> >> >> configuration
> > > > > >> >> >>>>>>>>>>> (configuration
> > > > > >> >> >>>>>>>>>>>>>>>>> format
> > > > > >> >> >>>>>>>>>>>>>>>>>>> may
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> different from scala-shell and
> > sql-client)
> > > > > >> >> >> and
> > > > > >> >> >>>> then
> > > > > >> >> >>>>>> use
> > > > > >> >> >>>>>>>>>>> that
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to manipulate jobs. I don't think this is
> > > > > >> >> >>>>> convenient
> > > > > >> >> >>>>>>> for
> > > > > >> >> >>>>>>>>>>>>>>>> downstream
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> projects. What I expect is that
> > downstream
> > > > > >> >> >>>> project
> > > > > >> >> >>>>>> only
> > > > > >> >> >>>>>>>>>>> needs
> > > > > >> >> >>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>> provide
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> necessary configuration info (maybe
> > > > > >> >> >> introducing
> > > > > >> >> >>>>> class
> > > > > >> >> >>>>>>>>>>>> FlinkConf),
> > > > > >> >> >>>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> build ExecutionEnvironment based on this
> > > > > >> >> >>>> FlinkConf,
> > > > > >> >> >>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will create the
> > proper
> > > > > >> >> >>>>>>>> ClusterClient.
> > > > > >> >> >>>>>>>>>>> It
> > > > > >> >> >>>>>>>>>>>> not
> > > > > >> >> >>>>>>>>>>>>>>>>>> only
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> benefit for the downstream project
> > > > > >> >> >> development
> > > > > >> >> >>>> but
> > > > > >> >> >>>>>> also
> > > > > >> >> >>>>>>>> be
> > > > > >> >> >>>>>>>>>>>>>>>> helpful
> > > > > >> >> >>>>>>>>>>>>>>>>>> for
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> their integration test with flink. Here's
> > > one
> > > > > >> >> >>>>> sample
> > > > > >> >> >>>>>>> code
> > > > > >> >> >>>>>>>>>>>> snippet
> > > > > >> >> >>>>>>>>>>>>>>>>>> that
> > > > > >> >> >>>>>>>>>>>>>>>>>>> I
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> expect.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val conf = new FlinkConf().mode("yarn")
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val env = new ExecutionEnvironment(conf)
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobId = env.submit(...)
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobStatus =
> > > > > >> >> >>>>>>>>>>> env.getClusterClient().queryJobStatus(jobId)
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> env.getClusterClient().cancelJob(jobId)
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> What do you think ?
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Till Rohrmann <trohrm...@apache.org>
> > > > > >> >> >>>>> 于2018年12月11日周二
> > > > > >> >> >>>>>>>>>>> 下午6:28写道:
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi Jeff,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> what you are proposing is to provide the
> > > > > >> >> >> user
> > > > > >> >> >>>> with
> > > > > >> >> >>>>>>>> better
> > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> control. There was actually an effort to
> > > > > >> >> >>> achieve
> > > > > >> >> >>>>>> this
> > > > > >> >> >>>>>>>> but
> > > > > >> >> >>>>>>>>>>> it
> > > > > >> >> >>>>>>>>>>>>>>>> has
> > > > > >> >> >>>>>>>>>>>>>>>>>>> never
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> been
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> completed [1]. However, there are some
> > > > > >> >> >>>> improvement
> > > > > >> >> >>>>>> in
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>> code
> > > > > >> >> >>>>>>>>>>>>>>>>> base
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> now.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Look for example at the NewClusterClient
> > > > > >> >> >>>> interface
> > > > > >> >> >>>>>>> which
> > > > > >> >> >>>>>>>>>>>>>>>> offers a
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> non-blocking job submission. But I agree
> > > > > >> >> >> that
> > > > > >> >> >>> we
> > > > > >> >> >>>>>> need
> > > > > >> >> >>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>> improve
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> in
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> this regard.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I would not be in favour if exposing all
> > > > > >> >> >>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>> calls
> > > > > >> >> >>>>>>>>>>>>>>>> via
> > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment because it would
> > > > > >> >> >> clutter
> > > > > >> >> >>>> the
> > > > > >> >> >>>>>>> class
> > > > > >> >> >>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>> would
> > > > > >> >> >>>>>>>>>>>>>>>>>> not
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> a
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> good separation of concerns. Instead one
> > > > > >> >> >> idea
> > > > > >> >> >>>>> could
> > > > > >> >> >>>>>> be
> > > > > >> >> >>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>> retrieve
> > > > > >> >> >>>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> current ClusterClient from the
> > > > > >> >> >>>>> ExecutionEnvironment
> > > > > >> >> >>>>>>>> which
> > > > > >> >> >>>>>>>>>>> can
> > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>> be
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> used
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> for cluster and job control. But before
> > we
> > > > > >> >> >>> start
> > > > > >> >> >>>>> an
> > > > > >> >> >>>>>>>> effort
> > > > > >> >> >>>>>>>>>>>>>>>> here,
> > > > > >> >> >>>>>>>>>>>>>>>>> we
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> need
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> agree and capture what functionality we
> > > want
> > > > > >> >> >>> to
> > > > > >> >> >>>>>>> provide.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Initially, the idea was that we have the
> > > > > >> >> >>>>>>>> ClusterDescriptor
> > > > > >> >> >>>>>>>>>>>>>>>>>> describing
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> how
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> to talk to cluster manager like Yarn or
> > > > > >> >> >> Mesos.
> > > > > >> >> >>>> The
> > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterDescriptor
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> can
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> be
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> used for deploying Flink clusters (job
> > and
> > > > > >> >> >>>>> session)
> > > > > >> >> >>>>>>> and
> > > > > >> >> >>>>>>>>>>> gives
> > > > > >> >> >>>>>>>>>>>>>>>>> you a
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ClusterClient. The ClusterClient
> > controls
> > > > > >> >> >> the
> > > > > >> >> >>>>>> cluster
> > > > > >> >> >>>>>>>>>>> (e.g.
> > > > > >> >> >>>>>>>>>>>>>>>>>>> submitting
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> jobs, listing all running jobs). And
> > then
> > > > > >> >> >>> there
> > > > > >> >> >>>>> was
> > > > > >> >> >>>>>>> the
> > > > > >> >> >>>>>>>>>>> idea
> > > > > >> >> >>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> introduce a
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> JobClient which you obtain from the
> > > > > >> >> >>>> ClusterClient
> > > > > >> >> >>>>> to
> > > > > >> >> >>>>>>>>>>> trigger
> > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> specific
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> operations (e.g. taking a savepoint,
> > > > > >> >> >>> cancelling
> > > > > >> >> >>>>> the
> > > > > >> >> >>>>>>>> job).
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> [1]
> > > > > >> >> >>>>>> https://issues.apache.org/jira/browse/FLINK-4272
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Till
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff
> > > Zhang
> > > > > >> >> >> <
> > > > > >> >> >>>>>>>>>>> zjf...@gmail.com
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Folks,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I am trying to integrate flink into
> > > apache
> > > > > >> >> >>>>> zeppelin
> > > > > >> >> >>>>>>>>>>> which is
> > > > > >> >> >>>>>>>>>>>>>>>> an
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> interactive
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> notebook. And I hit several issues that
> > > is
> > > > > >> >> >>>> caused
> > > > > >> >> >>>>>> by
> > > > > >> >> >>>>>>>>>>> flink
> > > > > >> >> >>>>>>>>>>>>>>>>> client
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> api.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> So
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I'd like to proposal the following
> > > changes
> > > > > >> >> >>> for
> > > > > >> >> >>>>>> flink
> > > > > >> >> >>>>>>>>>>> client
> > > > > >> >> >>>>>>>>>>>>>>>>> api.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1. Support nonblocking execution.
> > > > > >> >> >> Currently,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment#execute
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is a blocking method which would do 2
> > > > > >> >> >> things,
> > > > > >> >> >>>>> first
> > > > > >> >> >>>>>>>>>>> submit
> > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wait for job until it is finished. I'd
> > > like
> > > > > >> >> >>>>>>> introduce a
> > > > > >> >> >>>>>>>>>>>>>>>>>> nonblocking
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution method like
> > > > > >> >> >>>> ExecutionEnvironment#submit
> > > > > >> >> >>>>>>> which
> > > > > >> >> >>>>>>>>>>> only
> > > > > >> >> >>>>>>>>>>>>>>>>>> submit
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> then return jobId to client. And allow
> > > user
> > > > > >> >> >>> to
> > > > > >> >> >>>>>> query
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>> status
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> via
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 2. Add cancel api in
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> currently the only way to cancel job is
> > > via
> > > > > >> >> >>> cli
> > > > > >> >> >>>>>>>>>>> (bin/flink),
> > > > > >> >> >>>>>>>>>>>>>>>>> this
> > > > > >> >> >>>>>>>>>>>>>>>>>>> is
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> not
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> convenient for downstream project to
> > use
> > > > > >> >> >> this
> > > > > >> >> >>>>>>> feature.
> > > > > >> >> >>>>>>>>>>> So I'd
> > > > > >> >> >>>>>>>>>>>>>>>>>> like
> > > > > >> >> >>>>>>>>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> add
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cancel api in ExecutionEnvironment
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 3. Add savepoint api in
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> It
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is similar as cancel api, we should use
> > > > > >> >> >>>>>>>>>>> ExecutionEnvironment
> > > > > >> >> >>>>>>>>>>>>>>>> as
> > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> unified
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> api for third party to integrate with
> > > > > >> >> >> flink.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 4. Add listener for job execution
> > > > > >> >> >> lifecycle.
> > > > > >> >> >>>>>>> Something
> > > > > >> >> >>>>>>>>>>> like
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> following,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> so
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that downstream project can do custom
> > > logic
> > > > > >> >> >>> in
> > > > > >> >> >>>>> the
> > > > > >> >> >>>>>>>>>>> lifecycle
> > > > > >> >> >>>>>>>>>>>>>>>> of
> > > > > >> >> >>>>>>>>>>>>>>>>>>> job.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> e.g.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Zeppelin would capture the jobId after
> > > job
> > > > > >> >> >> is
> > > > > >> >> >>>>>>> submitted
> > > > > >> >> >>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>> use
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId to cancel it later when
> > necessary.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> public interface JobListener {
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobSubmitted(JobID jobId);
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobExecuted(JobExecutionResult
> > > > > >> >> >>>>> jobResult);
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobCanceled(JobID jobId);
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> }
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 5. Enable session in
> > > ExecutionEnvironment.
> > > > > >> >> >>>>>> Currently
> > > > > >> >> >>>>>>> it
> > > > > >> >> >>>>>>>>>>> is
> > > > > >> >> >>>>>>>>>>>>>>>>>>> disabled,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> but
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> session is very convenient for third
> > > party
> > > > > >> >> >> to
> > > > > >> >> >>>>>>>> submitting
> > > > > >> >> >>>>>>>>>>> jobs
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> continually.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I hope flink can enable it again.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6. Unify all flink client api into
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a long term issue which needs
> > > more
> > > > > >> >> >>>>> careful
> > > > > >> >> >>>>>>>>>>> thinking
> > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> design.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Currently some of features of flink is
> > > > > >> >> >>> exposed
> > > > > >> >> >>>> in
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > >> >> >>>>>> but
> > > > > >> >> >>>>>>>>>>> some are
> > > > > >> >> >>>>>>>>>>>>>>>>>>> exposed
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> in
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cli instead of api, like the cancel and
> > > > > >> >> >>>>> savepoint I
> > > > > >> >> >>>>>>>>>>> mentioned
> > > > > >> >> >>>>>>>>>>>>>>>>>>> above.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> I
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> think the root cause is due to that
> > flink
> > > > > >> >> >>>> didn't
> > > > > >> >> >>>>>>> unify
> > > > > >> >> >>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> interaction
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> with
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink. Here I list 3 scenarios of flink
> > > > > >> >> >>>> operation
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Local job execution.  Flink will
> > > create
> > > > > >> >> >>>>>>>>>>> LocalEnvironment
> > > > > >> >> >>>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> use
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this LocalEnvironment to create
> > > > > >> >> >>> LocalExecutor
> > > > > >> >> >>>>> for
> > > > > >> >> >>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>> execution.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Remote job execution. Flink will
> > > create
> > > > > >> >> >>>>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>>>>>>>> first
> > > > > >> >> >>>>>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  create ContextEnvironment based on the
> > > > > >> >> >>>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>> run
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> job.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Job cancelation. Flink will create
> > > > > >> >> >>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>> first
> > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>> then
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> cancel
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this job via this ClusterClient.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> As you can see in the above 3
> > scenarios.
> > > > > >> >> >>> Flink
> > > > > >> >> >>>>>> didn't
> > > > > >> >> >>>>>>>>>>> use the
> > > > > >> >> >>>>>>>>>>>>>>>>>> same
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> approach(code path) to interact with
> > > flink
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> What I propose is following:
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Create the proper
> > > > > >> >> >>>>>> LocalEnvironment/RemoteEnvironment
> > > > > >> >> >>>>>>>>>>> (based
> > > > > >> >> >>>>>>>>>>>>>>>> on
> > > > > >> >> >>>>>>>>>>>>>>>>>> user
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration) --> Use this Environment
> > > to
> > > > > >> >> >>>> create
> > > > > >> >> >>>>>>>> proper
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (LocalClusterClient or
> > RestClusterClient)
> > > > > >> >> >> to
> > > > > >> >> >>>>>>>> interactive
> > > > > >> >> >>>>>>>>>>> with
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink (
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution or cancelation)
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This way we can unify the process of
> > > local
> > > > > >> >> >>>>>> execution
> > > > > >> >> >>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>> remote
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> execution.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> And it is much easier for third party
> > to
> > > > > >> >> >>>>> integrate
> > > > > >> >> >>>>>>> with
> > > > > >> >> >>>>>>>>>>>>>>>> flink,
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> because
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment is the unified
> > entry
> > > > > >> >> >>> point
> > > > > >> >> >>>>> for
> > > > > >> >> >>>>>>>>>>> flink.
> > > > > >> >> >>>>>>>>>>>>>>>> What
> > > > > >> >> >>>>>>>>>>>>>>>>>>> third
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> party
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> needs to do is just pass configuration
> > to
> > > > > >> >> >>>>>>>>>>>>>>>> ExecutionEnvironment
> > > > > >> >> >>>>>>>>>>>>>>>>>> and
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will do the right
> > > > > >> >> >> thing
> > > > > >> >> >>>>> based
> > > > > >> >> >>>>>> on
> > > > > >> >> >>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> configuration.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Flink cli can also be considered as
> > flink
> > > > > >> >> >> api
> > > > > >> >> >>>>>>> consumer.
> > > > > >> >> >>>>>>>>>>> it
> > > > > >> >> >>>>>>>>>>>>>>>> just
> > > > > >> >> >>>>>>>>>>>>>>>>>>> pass
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration to ExecutionEnvironment
> > and
> > > > > >> >> >> let
> > > > > >> >> >>>>>>>>>>>>>>>>>> ExecutionEnvironment
> > > > > >> >> >>>>>>>>>>>>>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> create the proper ClusterClient instead
> > > of
> > > > > >> >> >>>>> letting
> > > > > >> >> >>>>>>> cli
> > > > > >> >> >>>>>>>> to
> > > > > >> >> >>>>>>>>>>>>>>>>> create
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ClusterClient directly.
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6 would involve large code refactoring,
> > > so
> > > > > >> >> >> I
> > > > > >> >> >>>>> think
> > > > > >> >> >>>>>> we
> > > > > >> >> >>>>>>>> can
> > > > > >> >> >>>>>>>>>>>>>>>> defer
> > > > > >> >> >>>>>>>>>>>>>>>>>> it
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> for
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> future release, 1,2,3,4,5 could be done
> > > at
> > > > > >> >> >>>> once I
> > > > > >> >> >>>>>>>>>>> believe.
> > > > > >> >> >>>>>>>>>>>>>>>> Let
> > > > > >> >> >>>>>>>>>>>>>>>>> me
> > > > > >> >> >>>>>>>>>>>>>>>>>>>> know
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> your
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> comments and feedback, thanks
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> --
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> --
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> --
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>> --
> > > > > >> >> >>>>>>>>>>>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>> --
> > > > > >> >> >>>>>>>>>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>> --
> > > > > >> >> >>>>>>>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>>
> > > > > >> >> >>>>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> --
> > > > > >> >> >>>>>>>> Best Regards
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Jeff Zhang
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> --
> > > > > >> >> >>>>> Best Regards
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> Jeff Zhang
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > --
> > > > > >> >> > Best Regards
> > > > > >> >> >
> > > > > >> >> > Jeff Zhang
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to