Great Kostas! Looking forward to your POC!

Best,
tison.


Jeff Zhang <zjf...@gmail.com> 于2019年8月30日周五 下午11:07写道:

> Awesome, @Kostas Looking forward your POC.
>
> Kostas Kloudas <kklou...@gmail.com> 于2019年8月30日周五 下午8:33写道:
>
> > Hi all,
> >
> > I am just writing here to let you know that I am working on a POC that
> > tries to refactor the current state of job submission in Flink.
> > I want to stress out that it introduces NO CHANGES to the current
> > behaviour of Flink. It just re-arranges things and introduces the
> > notion of an Executor, which is the entity responsible for taking the
> > user-code and submitting it for execution.
> >
> > Given this, the discussion about the functionality that the JobClient
> > will expose to the user can go on independently and the same
> > holds for all the open questions so far.
> >
> > I hope I will have some more new to share soon.
> >
> > Thanks,
> > Kostas
> >
> > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang <danrtsey...@gmail.com> wrote:
> > >
> > > Hi Zili,
> > >
> > > It make sense to me that a dedicated cluster is started for a per-job
> > > cluster and will not accept more jobs.
> > > Just have a question about the command line.
> > >
> > > Currently we could use the following commands to start different
> > clusters.
> > > *per-job cluster*
> > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
> > > examples/streaming/WindowJoin.jar
> > > *session cluster*
> > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
> > > examples/streaming/WindowJoin.jar
> > >
> > > What will it look like after client enhancement?
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > Zili Chen <wander4...@gmail.com> 于2019年8月23日周五 下午10:46写道:
> > >
> > > > Hi Till,
> > > >
> > > > Thanks for your update. Nice to hear :-)
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Till Rohrmann <trohrm...@apache.org> 于2019年8月23日周五 下午10:39写道:
> > > >
> > > > > Hi Tison,
> > > > >
> > > > > just a quick comment concerning the class loading issues when using
> > the
> > > > per
> > > > > job mode. The community wants to change it so that the
> > > > > StandaloneJobClusterEntryPoint actually uses the user code class
> > loader
> > > > > with child first class loading [1]. Hence, I hope that this problem
> > will
> > > > be
> > > > > resolved soon.
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas <kklou...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > On the topic of web submission, I agree with Till that it only
> > seems
> > > > > > to complicate things.
> > > > > > It is bad for security, job isolation (anybody can submit/cancel
> > jobs),
> > > > > > and its
> > > > > > implementation complicates some parts of the code. So, if it were
> > to
> > > > > > redesign the
> > > > > > WebUI, maybe this part could be left out. In addition, I would
> say
> > > > > > that the ability to cancel
> > > > > > jobs could also be left out.
> > > > > >
> > > > > > Also I would also be in favour of removing the "detached" mode,
> for
> > > > > > the reasons mentioned
> > > > > > above (i.e. because now we will have a future representing the
> > result
> > > > > > on which the user
> > > > > > can choose to wait or not).
> > > > > >
> > > > > > Now for the separating job submission and cluster creation, I am
> in
> > > > > > favour of keeping both.
> > > > > > Once again, the reasons are mentioned above by Stephan, Till,
> > Aljoscha
> > > > > > and also Zili seems
> > > > > > to agree. They mainly have to do with security, isolation and
> ease
> > of
> > > > > > resource management
> > > > > > for the user as he knows that "when my job is done, everything
> > will be
> > > > > > cleared up". This is
> > > > > > also the experience you get when launching a process on your
> local
> > OS.
> > > > > >
> > > > > > On excluding the per-job mode from returning a JobClient or not,
> I
> > > > > > believe that eventually
> > > > > > it would be nice to allow users to get back a jobClient. The
> > reason is
> > > > > > that 1) I cannot
> > > > > > find any objective reason why the user-experience should diverge,
> > and
> > > > > > 2) this will be the
> > > > > > way that the user will be able to interact with his running job.
> > > > > > Assuming that the necessary
> > > > > > ports are open for the REST API to work, then I think that the
> > > > > > JobClient can run against the
> > > > > > REST API without problems. If the needed ports are not open, then
> > we
> > > > > > are safe to not return
> > > > > > a JobClient, as the user explicitly chose to close all points of
> > > > > > communication to his running job.
> > > > > >
> > > > > > On the topic of not hijacking the "env.execute()" in order to get
> > the
> > > > > > Plan, I definitely agree but
> > > > > > for the proposal of having a "compile()" method in the env, I
> would
> > > > > > like to have a better look at
> > > > > > the existing code.
> > > > > >
> > > > > > Cheers,
> > > > > > Kostas
> > > > > >
> > > > > > On Fri, Aug 23, 2019 at 5:52 AM Zili Chen <wander4...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > Hi Yang,
> > > > > > >
> > > > > > > It would be helpful if you check Stephan's last comment,
> > > > > > > which states that isolation is important.
> > > > > > >
> > > > > > > For per-job mode, we run a dedicated cluster(maybe it
> > > > > > > should have been a couple of JM and TMs during FLIP-6
> > > > > > > design) for a specific job. Thus the process is prevented
> > > > > > > from other jobs.
> > > > > > >
> > > > > > > In our cases there was a time we suffered from multi
> > > > > > > jobs submitted by different users and they affected
> > > > > > > each other so that all ran into an error state. Also,
> > > > > > > run the client inside the cluster could save client
> > > > > > > resource at some points.
> > > > > > >
> > > > > > > However, we also face several issues as you mentioned,
> > > > > > > that in per-job mode it always uses parent classloader
> > > > > > > thus classloading issues occur.
> > > > > > >
> > > > > > > BTW, one can makes an analogy between session/per-job mode
> > > > > > > in  Flink, and client/cluster mode in Spark.
> > > > > > >
> > > > > > > Best,
> > > > > > > tison.
> > > > > > >
> > > > > > >
> > > > > > > Yang Wang <danrtsey...@gmail.com> 于2019年8月22日周四 上午11:25写道:
> > > > > > >
> > > > > > > > From the user's perspective, it is really confused about the
> > scope
> > > > of
> > > > > > > > per-job cluster.
> > > > > > > >
> > > > > > > >
> > > > > > > > If it means a flink cluster with single job, so that we could
> > get
> > > > > > better
> > > > > > > > isolation.
> > > > > > > >
> > > > > > > > Now it does not matter how we deploy the cluster, directly
> > > > > > deploy(mode1)
> > > > > > > >
> > > > > > > > or start a flink cluster and then submit job through cluster
> > > > > > client(mode2).
> > > > > > > >
> > > > > > > >
> > > > > > > > Otherwise, if it just means directly deploy, how should we
> > name the
> > > > > > mode2,
> > > > > > > >
> > > > > > > > session with job or something else?
> > > > > > > >
> > > > > > > > We could also benefit from the mode2. Users could get the
> same
> > > > > > isolation
> > > > > > > > with mode1.
> > > > > > > >
> > > > > > > > The user code and dependencies will be loaded by user class
> > loader
> > > > > > > >
> > > > > > > > to avoid class conflict with framework.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Anyway, both of the two submission modes are useful.
> > > > > > > >
> > > > > > > > We just need to clarify the concepts.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Yang
> > > > > > > >
> > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午5:58写道:
> > > > > > > >
> > > > > > > > > Thanks for the clarification.
> > > > > > > > >
> > > > > > > > > The idea JobDeployer ever came into my mind when I was
> > muddled
> > > > with
> > > > > > > > > how to execute per-job mode and session mode with the same
> > user
> > > > > code
> > > > > > > > > and framework codepath.
> > > > > > > > >
> > > > > > > > > With the concept JobDeployer we back to the statement that
> > > > > > environment
> > > > > > > > > knows every configs of cluster deployment and job
> > submission. We
> > > > > > > > > configure or generate from configuration a specific
> > JobDeployer
> > > > in
> > > > > > > > > environment and then code align on
> > > > > > > > >
> > > > > > > > > *JobClient client = env.execute().get();*
> > > > > > > > >
> > > > > > > > > which in session mode returned by clusterClient.submitJob
> > and in
> > > > > > per-job
> > > > > > > > > mode returned by clusterDescriptor.deployJobCluster.
> > > > > > > > >
> > > > > > > > > Here comes a problem that currently we directly run
> > > > > ClusterEntrypoint
> > > > > > > > > with extracted job graph. Follow the JobDeployer way we'd
> > better
> > > > > > > > > align entry point of per-job deployment at JobDeployer.
> > Users run
> > > > > > > > > their main method or by a Cli(finally call main method) to
> > deploy
> > > > > the
> > > > > > > > > job cluster.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > tison.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Stephan Ewen <se...@apache.org> 于2019年8月20日周二 下午4:40写道:
> > > > > > > > >
> > > > > > > > > > Till has made some good comments here.
> > > > > > > > > >
> > > > > > > > > > Two things to add:
> > > > > > > > > >
> > > > > > > > > >   - The job mode is very nice in the way that it runs the
> > > > client
> > > > > > inside
> > > > > > > > > the
> > > > > > > > > > cluster (in the same image/process that is the JM) and
> thus
> > > > > unifies
> > > > > > > > both
> > > > > > > > > > applications and what the Spark world calls the "driver
> > mode".
> > > > > > > > > >
> > > > > > > > > >   - Another thing I would add is that during the FLIP-6
> > design,
> > > > > we
> > > > > > were
> > > > > > > > > > thinking about setups where Dispatcher and JobManager are
> > > > > separate
> > > > > > > > > > processes.
> > > > > > > > > >     A Yarn or Mesos Dispatcher of a session could run
> > > > > independently
> > > > > > > > (even
> > > > > > > > > > as privileged processes executing no code).
> > > > > > > > > >     Then you the "per-job" mode could still be helpful:
> > when a
> > > > > job
> > > > > > is
> > > > > > > > > > submitted to the dispatcher, it launches the JM again in
> a
> > > > > per-job
> > > > > > > > mode,
> > > > > > > > > so
> > > > > > > > > > that JM and TM processes are bound to teh job only. For
> > higher
> > > > > > security
> > > > > > > > > > setups, it is important that processes are not reused
> > across
> > > > > jobs.
> > > > > > > > > >
> > > > > > > > > > On Tue, Aug 20, 2019 at 10:27 AM Till Rohrmann <
> > > > > > trohrm...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I would not be in favour of getting rid of the per-job
> > mode
> > > > > > since it
> > > > > > > > > > > simplifies the process of running Flink jobs
> > considerably.
> > > > > > Moreover,
> > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > > not only well suited for container deployments but also
> > for
> > > > > > > > deployments
> > > > > > > > > > > where you want to guarantee job isolation. For
> example, a
> > > > user
> > > > > > could
> > > > > > > > > use
> > > > > > > > > > > the per-job mode on Yarn to execute his job on a
> separate
> > > > > > cluster.
> > > > > > > > > > >
> > > > > > > > > > > I think that having two notions of cluster deployments
> > > > (session
> > > > > > vs.
> > > > > > > > > > per-job
> > > > > > > > > > > mode) does not necessarily contradict your ideas for
> the
> > > > client
> > > > > > api
> > > > > > > > > > > refactoring. For example one could have the following
> > > > > interfaces:
> > > > > > > > > > >
> > > > > > > > > > > - ClusterDeploymentDescriptor: encapsulates the logic
> > how to
> > > > > > deploy a
> > > > > > > > > > > cluster.
> > > > > > > > > > > - ClusterClient: allows to interact with a cluster
> > > > > > > > > > > - JobClient: allows to interact with a running job
> > > > > > > > > > >
> > > > > > > > > > > Now the ClusterDeploymentDescriptor could have two
> > methods:
> > > > > > > > > > >
> > > > > > > > > > > - ClusterClient deploySessionCluster()
> > > > > > > > > > > - JobClusterClient/JobClient
> > deployPerJobCluster(JobGraph)
> > > > > > > > > > >
> > > > > > > > > > > where JobClusterClient is either a supertype of
> > ClusterClient
> > > > > > which
> > > > > > > > > does
> > > > > > > > > > > not give you the functionality to submit jobs or
> > > > > > deployPerJobCluster
> > > > > > > > > > > returns directly a JobClient.
> > > > > > > > > > >
> > > > > > > > > > > When setting up the ExecutionEnvironment, one would
> then
> > not
> > > > > > provide
> > > > > > > > a
> > > > > > > > > > > ClusterClient to submit jobs but a JobDeployer which,
> > > > depending
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > selected mode, either uses a ClusterClient (session
> > mode) to
> > > > > > submit
> > > > > > > > > jobs
> > > > > > > > > > or
> > > > > > > > > > > a ClusterDeploymentDescriptor to deploy per a job mode
> > > > cluster
> > > > > > with
> > > > > > > > the
> > > > > > > > > > job
> > > > > > > > > > > to execute.
> > > > > > > > > > >
> > > > > > > > > > > These are just some thoughts how one could make it
> > working
> > > > > > because I
> > > > > > > > > > > believe there is some value in using the per job mode
> > from
> > > > the
> > > > > > > > > > > ExecutionEnvironment.
> > > > > > > > > > >
> > > > > > > > > > > Concerning the web submission, this is indeed a bit
> > tricky.
> > > > > From
> > > > > > a
> > > > > > > > > > cluster
> > > > > > > > > > > management stand point, I would in favour of not
> > executing
> > > > user
> > > > > > code
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > REST endpoint. Especially when considering security, it
> > would
> > > > > be
> > > > > > good
> > > > > > > > > to
> > > > > > > > > > > have a well defined cluster behaviour where it is
> > explicitly
> > > > > > stated
> > > > > > > > > where
> > > > > > > > > > > user code and, thus, potentially risky code is
> executed.
> > > > > Ideally
> > > > > > we
> > > > > > > > > limit
> > > > > > > > > > > it to the TaskExecutor and JobMaster.
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Till
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Aug 20, 2019 at 9:40 AM Flavio Pompermaier <
> > > > > > > > > pomperma...@okkam.it
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > In my opinion the client should not use any
> > environment to
> > > > > get
> > > > > > the
> > > > > > > > > Job
> > > > > > > > > > > > graph because the jar should reside ONLY on the
> cluster
> > > > (and
> > > > > > not in
> > > > > > > > > the
> > > > > > > > > > > > client classpath otherwise there are always
> > inconsistencies
> > > > > > between
> > > > > > > > > > > client
> > > > > > > > > > > > and Flink Job manager's classpath).
> > > > > > > > > > > > In the YARN, Mesos and Kubernetes scenarios you have
> > the
> > > > jar
> > > > > > but
> > > > > > > > you
> > > > > > > > > > > could
> > > > > > > > > > > > start a cluster that has the jar on the Job Manager
> as
> > well
> > > > > > (but
> > > > > > > > this
> > > > > > > > > > is
> > > > > > > > > > > > the only case where I think you can assume that the
> > client
> > > > > has
> > > > > > the
> > > > > > > > > jar
> > > > > > > > > > on
> > > > > > > > > > > > the classpath..in the REST job submission you don't
> > have
> > > > any
> > > > > > > > > > classpath).
> > > > > > > > > > > >
> > > > > > > > > > > > Thus, always in my opinion, the JobGraph should be
> > > > generated
> > > > > > by the
> > > > > > > > > Job
> > > > > > > > > > > > Manager REST API.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Aug 20, 2019 at 9:00 AM Zili Chen <
> > > > > > wander4...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> I would like to involve Till & Stephan here to
> clarify
> > > > some
> > > > > > > > concept
> > > > > > > > > of
> > > > > > > > > > > >> per-job mode.
> > > > > > > > > > > >>
> > > > > > > > > > > >> The term per-job is one of modes a cluster could run
> > on.
> > > > It
> > > > > is
> > > > > > > > > mainly
> > > > > > > > > > > >> aimed
> > > > > > > > > > > >> at spawn
> > > > > > > > > > > >> a dedicated cluster for a specific job while the job
> > could
> > > > > be
> > > > > > > > > packaged
> > > > > > > > > > > >> with
> > > > > > > > > > > >> Flink
> > > > > > > > > > > >> itself and thus the cluster initialized with job so
> > that
> > > > get
> > > > > > rid
> > > > > > > > of
> > > > > > > > > a
> > > > > > > > > > > >> separated
> > > > > > > > > > > >> submission step.
> > > > > > > > > > > >>
> > > > > > > > > > > >> This is useful for container deployments where one
> > create
> > > > > his
> > > > > > > > image
> > > > > > > > > > with
> > > > > > > > > > > >> the job
> > > > > > > > > > > >> and then simply deploy the container.
> > > > > > > > > > > >>
> > > > > > > > > > > >> However, it is out of client scope since a
> > > > > > client(ClusterClient
> > > > > > > > for
> > > > > > > > > > > >> example) is for
> > > > > > > > > > > >> communicate with an existing cluster and performance
> > > > > actions.
> > > > > > > > > > Currently,
> > > > > > > > > > > >> in
> > > > > > > > > > > >> per-job
> > > > > > > > > > > >> mode, we extract the job graph and bundle it into
> > cluster
> > > > > > > > deployment
> > > > > > > > > > and
> > > > > > > > > > > >> thus no
> > > > > > > > > > > >> concept of client get involved. It looks like
> > reasonable
> > > > to
> > > > > > > > exclude
> > > > > > > > > > the
> > > > > > > > > > > >> deployment
> > > > > > > > > > > >> of per-job cluster from client api and use dedicated
> > > > utility
> > > > > > > > > > > >> classes(deployers) for
> > > > > > > > > > > >> deployment.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Zili Chen <wander4...@gmail.com> 于2019年8月20日周二
> > 下午12:37写道:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hi Aljoscha,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for your reply and participance. The Google
> > Doc
> > > > you
> > > > > > > > linked
> > > > > > > > > to
> > > > > > > > > > > >> > requires
> > > > > > > > > > > >> > permission and I think you could use a share link
> > > > instead.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I agree with that we almost reach a consensus that
> > > > > > JobClient is
> > > > > > > > > > > >> necessary
> > > > > > > > > > > >> > to
> > > > > > > > > > > >> > interacte with a running Job.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Let me check your open questions one by one.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 1. Separate cluster creation and job submission
> for
> > > > > per-job
> > > > > > > > mode.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > As you mentioned here is where the opinions
> > diverge. In
> > > > my
> > > > > > > > > document
> > > > > > > > > > > >> there
> > > > > > > > > > > >> > is
> > > > > > > > > > > >> > an alternative[2] that proposes excluding per-job
> > > > > deployment
> > > > > > > > from
> > > > > > > > > > > client
> > > > > > > > > > > >> > api
> > > > > > > > > > > >> > scope and now I find it is more reasonable we do
> the
> > > > > > exclusion.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > When in per-job mode, a dedicated JobCluster is
> > launched
> > > > > to
> > > > > > > > > execute
> > > > > > > > > > > the
> > > > > > > > > > > >> > specific job. It is like a Flink Application more
> > than a
> > > > > > > > > submission
> > > > > > > > > > > >> > of Flink Job. Client only takes care of job
> > submission
> > > > and
> > > > > > > > assume
> > > > > > > > > > > there
> > > > > > > > > > > >> is
> > > > > > > > > > > >> > an existing cluster. In this way we are able to
> > consider
> > > > > > per-job
> > > > > > > > > > > issues
> > > > > > > > > > > >> > individually and JobClusterEntrypoint would be the
> > > > utility
> > > > > > class
> > > > > > > > > for
> > > > > > > > > > > >> > per-job
> > > > > > > > > > > >> > deployment.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Nevertheless, user program works in both session
> > mode
> > > > and
> > > > > > > > per-job
> > > > > > > > > > mode
> > > > > > > > > > > >> > without
> > > > > > > > > > > >> > necessary to change code. JobClient in per-job
> mode
> > is
> > > > > > returned
> > > > > > > > > from
> > > > > > > > > > > >> > env.execute as normal. However, it would be no
> > longer a
> > > > > > wrapper
> > > > > > > > of
> > > > > > > > > > > >> > RestClusterClient but a wrapper of
> > PerJobClusterClient
> > > > > which
> > > > > > > > > > > >> communicates
> > > > > > > > > > > >> > to Dispatcher locally.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 2. How to deal with plan preview.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > With env.compile functions users can get JobGraph
> or
> > > > > > FlinkPlan
> > > > > > > > and
> > > > > > > > > > > thus
> > > > > > > > > > > >> > they can preview the plan with programming.
> > Typically it
> > > > > > looks
> > > > > > > > > like
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > if (preview configured) {
> > > > > > > > > > > >> >     FlinkPlan plan = env.compile();
> > > > > > > > > > > >> >     new JSONDumpGenerator(...).dump(plan);
> > > > > > > > > > > >> > } else {
> > > > > > > > > > > >> >     env.execute();
> > > > > > > > > > > >> > }
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > And `flink info` would be invalid any more.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 3. How to deal with Jar Submission at the Web
> > Frontend.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > There is one more thread talked on this topic[1].
> > Apart
> > > > > from
> > > > > > > > > > removing
> > > > > > > > > > > >> > the functions there are two alternatives.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > One is to introduce an interface has a method
> > returns
> > > > > > > > > > > JobGraph/FilnkPlan
> > > > > > > > > > > >> > and Jar Submission only support main-class
> > implements
> > > > this
> > > > > > > > > > interface.
> > > > > > > > > > > >> > And then extract the JobGraph/FlinkPlan just by
> > calling
> > > > > the
> > > > > > > > > method.
> > > > > > > > > > > >> > In this way, it is even possible to consider a
> > > > separation
> > > > > > of job
> > > > > > > > > > > >> creation
> > > > > > > > > > > >> > and job submission.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The other is, as you mentioned, let execute() do
> the
> > > > > actual
> > > > > > > > > > execution.
> > > > > > > > > > > >> > We won't execute the main method in the
> WebFrontend
> > but
> > > > > > spawn a
> > > > > > > > > > > process
> > > > > > > > > > > >> > at WebMonitor side to execute. For return part we
> > could
> > > > > > generate
> > > > > > > > > the
> > > > > > > > > > > >> > JobID from WebMonitor and pass it to the execution
> > > > > > environemnt.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 4. How to deal with detached mode.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I think detached mode is a temporary solution for
> > > > > > non-blocking
> > > > > > > > > > > >> submission.
> > > > > > > > > > > >> > In my document both submission and execution
> return
> > a
> > > > > > > > > > > CompletableFuture
> > > > > > > > > > > >> and
> > > > > > > > > > > >> > users control whether or not wait for the result.
> In
> > > > this
> > > > > > point
> > > > > > > > we
> > > > > > > > > > > don't
> > > > > > > > > > > >> > need a detached option but the functionality is
> > covered.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > 5. How does per-job mode interact with interactive
> > > > > > programming.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > All of YARN, Mesos and Kubernetes scenarios follow
> > the
> > > > > > pattern
> > > > > > > > > > launch
> > > > > > > > > > > a
> > > > > > > > > > > >> > JobCluster now. And I don't think there would be
> > > > > > inconsistency
> > > > > > > > > > between
> > > > > > > > > > > >> > different resource management.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Best,
> > > > > > > > > > > >> > tison.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1]
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/x/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > >> > [2]
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?disco=AAAADZaGGfs
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Aljoscha Krettek <aljos...@apache.org>
> > 于2019年8月16日周五
> > > > > > 下午9:20写道:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> Hi,
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> I read both Jeffs initial design document and the
> > newer
> > > > > > > > document
> > > > > > > > > by
> > > > > > > > > > > >> >> Tison. I also finally found the time to collect
> our
> > > > > > thoughts on
> > > > > > > > > the
> > > > > > > > > > > >> issue,
> > > > > > > > > > > >> >> I had quite some discussions with Kostas and this
> > is
> > > > the
> > > > > > > > result:
> > > > > > > > > > [1].
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> I think overall we agree that this part of the
> > code is
> > > > in
> > > > > > dire
> > > > > > > > > need
> > > > > > > > > > > of
> > > > > > > > > > > >> >> some refactoring/improvements but I think there
> are
> > > > still
> > > > > > some
> > > > > > > > > open
> > > > > > > > > > > >> >> questions and some differences in opinion what
> > those
> > > > > > > > refactorings
> > > > > > > > > > > >> should
> > > > > > > > > > > >> >> look like.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> I think the API-side is quite clear, i.e. we need
> > some
> > > > > > > > JobClient
> > > > > > > > > > API
> > > > > > > > > > > >> that
> > > > > > > > > > > >> >> allows interacting with a running Job. It could
> be
> > > > > > worthwhile
> > > > > > > > to
> > > > > > > > > > spin
> > > > > > > > > > > >> that
> > > > > > > > > > > >> >> off into a separate FLIP because we can probably
> > find
> > > > > > consensus
> > > > > > > > > on
> > > > > > > > > > > that
> > > > > > > > > > > >> >> part more easily.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> For the rest, the main open questions from our
> doc
> > are
> > > > > > these:
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>   - Do we want to separate cluster creation and
> job
> > > > > > submission
> > > > > > > > > for
> > > > > > > > > > > >> >> per-job mode? In the past, there were conscious
> > efforts
> > > > > to
> > > > > > > > *not*
> > > > > > > > > > > >> separate
> > > > > > > > > > > >> >> job submission from cluster creation for per-job
> > > > clusters
> > > > > > for
> > > > > > > > > > Mesos,
> > > > > > > > > > > >> YARN,
> > > > > > > > > > > >> >> Kubernets (see StandaloneJobClusterEntryPoint).
> > Tison
> > > > > > suggests
> > > > > > > > in
> > > > > > > > > > his
> > > > > > > > > > > >> >> design document to decouple this in order to
> unify
> > job
> > > > > > > > > submission.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>   - How to deal with plan preview, which needs to
> > > > hijack
> > > > > > > > > execute()
> > > > > > > > > > > and
> > > > > > > > > > > >> >> let the outside code catch an exception?
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>   - How to deal with Jar Submission at the Web
> > > > Frontend,
> > > > > > which
> > > > > > > > > > needs
> > > > > > > > > > > to
> > > > > > > > > > > >> >> hijack execute() and let the outside code catch
> an
> > > > > > exception?
> > > > > > > > > > > >> >> CliFrontend.run() “hijacks”
> > > > > ExecutionEnvironment.execute()
> > > > > > to
> > > > > > > > > get a
> > > > > > > > > > > >> >> JobGraph and then execute that JobGraph manually.
> > We
> > > > > could
> > > > > > get
> > > > > > > > > > around
> > > > > > > > > > > >> that
> > > > > > > > > > > >> >> by letting execute() do the actual execution. One
> > > > caveat
> > > > > > for
> > > > > > > > this
> > > > > > > > > > is
> > > > > > > > > > > >> that
> > > > > > > > > > > >> >> now the main() method doesn’t return (or is
> forced
> > to
> > > > > > return by
> > > > > > > > > > > >> throwing an
> > > > > > > > > > > >> >> exception from execute()) which means that for
> Jar
> > > > > > Submission
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > >> >> WebFrontend we have a long-running main() method
> > > > running
> > > > > > in the
> > > > > > > > > > > >> >> WebFrontend. This doesn’t sound very good. We
> > could get
> > > > > > around
> > > > > > > > > this
> > > > > > > > > > > by
> > > > > > > > > > > >> >> removing the plan preview feature and by removing
> > Jar
> > > > > > > > > > > >> Submission/Running.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>   - How to deal with detached mode? Right now,
> > > > > > > > > DetachedEnvironment
> > > > > > > > > > > will
> > > > > > > > > > > >> >> execute the job and return immediately. If users
> > > > control
> > > > > > when
> > > > > > > > > they
> > > > > > > > > > > >> want to
> > > > > > > > > > > >> >> return, by waiting on the job completion future,
> > how do
> > > > > we
> > > > > > deal
> > > > > > > > > > with
> > > > > > > > > > > >> this?
> > > > > > > > > > > >> >> Do we simply remove the distinction between
> > > > > > > > > detached/non-detached?
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>   - How does per-job mode interact with
> > “interactive
> > > > > > > > programming”
> > > > > > > > > > > >> >> (FLIP-36). For YARN, each execute() call could
> > spawn a
> > > > > new
> > > > > > > > Flink
> > > > > > > > > > YARN
> > > > > > > > > > > >> >> cluster. What about Mesos and Kubernetes?
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> The first open question is where the opinions
> > diverge,
> > > > I
> > > > > > think.
> > > > > > > > > The
> > > > > > > > > > > >> rest
> > > > > > > > > > > >> >> are just open questions and interesting things
> > that we
> > > > > > need to
> > > > > > > > > > > >> consider.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> Best,
> > > > > > > > > > > >> >> Aljoscha
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> [1]
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> > > > > > > > > > > >> >> <
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> > On 31. Jul 2019, at 15:23, Jeff Zhang <
> > > > > zjf...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Thanks tison for the effort. I left a few
> > comments.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Zili Chen <wander4...@gmail.com> 于2019年7月31日周三
> > > > > 下午8:24写道:
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >> Hi Flavio,
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >> Thanks for your reply.
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >> Either current impl and in the design,
> > ClusterClient
> > > > > > > > > > > >> >> >> never takes responsibility for generating
> > JobGraph.
> > > > > > > > > > > >> >> >> (what you see in current codebase is several
> > class
> > > > > > methods)
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >> Instead, user describes his program in the
> main
> > > > method
> > > > > > > > > > > >> >> >> with ExecutionEnvironment apis and calls
> > > > env.compile()
> > > > > > > > > > > >> >> >> or env.optimize() to get FlinkPlan and
> JobGraph
> > > > > > > > respectively.
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >> For listing main classes in a jar and choose
> > one for
> > > > > > > > > > > >> >> >> submission, you're now able to customize a CLI
> > to do
> > > > > it.
> > > > > > > > > > > >> >> >> Specifically, the path of jar is passed as
> > arguments
> > > > > and
> > > > > > > > > > > >> >> >> in the customized CLI you list main classes,
> > choose
> > > > > one
> > > > > > > > > > > >> >> >> to submit to the cluster.
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >> Best,
> > > > > > > > > > > >> >> >> tison.
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >> Flavio Pompermaier <pomperma...@okkam.it>
> > > > > 于2019年7月31日周三
> > > > > > > > > > 下午8:12写道:
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >>> Just one note on my side: it is not clear to
> me
> > > > > > whether the
> > > > > > > > > > > client
> > > > > > > > > > > >> >> needs
> > > > > > > > > > > >> >> >> to
> > > > > > > > > > > >> >> >>> be able to generate a job graph or not.
> > > > > > > > > > > >> >> >>> In my opinion, the job jar must resides only
> > on the
> > > > > > > > > > > >> server/jobManager
> > > > > > > > > > > >> >> >> side
> > > > > > > > > > > >> >> >>> and the client requires a way to get the job
> > graph.
> > > > > > > > > > > >> >> >>> If you really want to access to the job
> graph,
> > I'd
> > > > > add
> > > > > > a
> > > > > > > > > > > dedicated
> > > > > > > > > > > >> >> method
> > > > > > > > > > > >> >> >>> on the ClusterClient. like:
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>>   - getJobGraph(jarId, mainClass): JobGraph
> > > > > > > > > > > >> >> >>>   - listMainClasses(jarId): List<String>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>> These would require some addition also on the
> > job
> > > > > > manager
> > > > > > > > > > > endpoint
> > > > > > > > > > > >> as
> > > > > > > > > > > >> >> >>> well..what do you think?
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>> On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <
> > > > > > > > > > wander4...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > >> >> wrote:
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>>> Hi all,
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> Here is a document[1] on client api
> > enhancement
> > > > from
> > > > > > our
> > > > > > > > > > > >> perspective.
> > > > > > > > > > > >> >> >>>> We have investigated current
> implementations.
> > And
> > > > we
> > > > > > > > propose
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> 1. Unify the implementation of cluster
> > deployment
> > > > > and
> > > > > > job
> > > > > > > > > > > >> submission
> > > > > > > > > > > >> >> in
> > > > > > > > > > > >> >> >>>> Flink.
> > > > > > > > > > > >> >> >>>> 2. Provide programmatic interfaces to allow
> > > > flexible
> > > > > > job
> > > > > > > > and
> > > > > > > > > > > >> cluster
> > > > > > > > > > > >> >> >>>> management.
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> The first proposal is aimed at reducing code
> > paths
> > > > > of
> > > > > > > > > cluster
> > > > > > > > > > > >> >> >> deployment
> > > > > > > > > > > >> >> >>>> and
> > > > > > > > > > > >> >> >>>> job submission so that one can adopt Flink
> in
> > his
> > > > > > usage
> > > > > > > > > > easily.
> > > > > > > > > > > >> The
> > > > > > > > > > > >> >> >>> second
> > > > > > > > > > > >> >> >>>> proposal is aimed at providing rich
> > interfaces for
> > > > > > > > advanced
> > > > > > > > > > > users
> > > > > > > > > > > >> >> >>>> who want to make accurate control of these
> > stages.
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> Quick reference on open questions:
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> 1. Exclude job cluster deployment from
> client
> > side
> > > > > or
> > > > > > > > > redefine
> > > > > > > > > > > the
> > > > > > > > > > > >> >> >>> semantic
> > > > > > > > > > > >> >> >>>> of job cluster? Since it fits in a process
> > quite
> > > > > > different
> > > > > > > > > > from
> > > > > > > > > > > >> >> session
> > > > > > > > > > > >> >> >>>> cluster deployment and job submission.
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> 2. Maintain the codepaths handling class
> > > > > > > > > > > o.a.f.api.common.Program
> > > > > > > > > > > >> or
> > > > > > > > > > > >> >> >>>> implement customized program handling logic
> by
> > > > > > customized
> > > > > > > > > > > >> >> CliFrontend?
> > > > > > > > > > > >> >> >>>> See also this thread[2] and the document[1].
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> 3. Expose ClusterClient as public api or
> just
> > > > expose
> > > > > > api
> > > > > > > > in
> > > > > > > > > > > >> >> >>>> ExecutionEnvironment
> > > > > > > > > > > >> >> >>>> and delegate them to ClusterClient? Further,
> > in
> > > > > > either way
> > > > > > > > > is
> > > > > > > > > > it
> > > > > > > > > > > >> >> worth
> > > > > > > > > > > >> >> >> to
> > > > > > > > > > > >> >> >>>> introduce a JobClient which is an
> > encapsulation of
> > > > > > > > > > ClusterClient
> > > > > > > > > > > >> that
> > > > > > > > > > > >> >> >>>> associated to specific job?
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> Best,
> > > > > > > > > > > >> >> >>>> tison.
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> [1]
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
> > > > > > > > > > > >> >> >>>> [2]
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三
> > > > > 上午9:19写道:
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>>> Thanks Stephan, I will follow up this issue
> > in
> > > > next
> > > > > > few
> > > > > > > > > > weeks,
> > > > > > > > > > > >> and
> > > > > > > > > > > >> >> >> will
> > > > > > > > > > > >> >> >>>>> refine the design doc. We could discuss
> more
> > > > > details
> > > > > > > > after
> > > > > > > > > > 1.9
> > > > > > > > > > > >> >> >> release.
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>> Stephan Ewen <se...@apache.org>
> > 于2019年7月24日周三
> > > > > > 上午12:58写道:
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>>> Hi all!
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>> This thread has stalled for a bit, which I
> > > > assume
> > > > > > ist
> > > > > > > > > mostly
> > > > > > > > > > > >> due to
> > > > > > > > > > > >> >> >>> the
> > > > > > > > > > > >> >> >>>>>> Flink 1.9 feature freeze and release
> testing
> > > > > effort.
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>> I personally still recognize this issue as
> > one
> > > > > > important
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > >> >> >>> solved.
> > > > > > > > > > > >> >> >>>>> I'd
> > > > > > > > > > > >> >> >>>>>> be happy to help resume this discussion
> soon
> > > > > (after
> > > > > > the
> > > > > > > > > 1.9
> > > > > > > > > > > >> >> >> release)
> > > > > > > > > > > >> >> >>>> and
> > > > > > > > > > > >> >> >>>>>> see if we can do some step towards this in
> > Flink
> > > > > > 1.10.
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>> Best,
> > > > > > > > > > > >> >> >>>>>> Stephan
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>> On Mon, Jun 24, 2019 at 10:41 AM Flavio
> > > > > Pompermaier
> > > > > > <
> > > > > > > > > > > >> >> >>>>> pomperma...@okkam.it>
> > > > > > > > > > > >> >> >>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>>> That's exactly what I suggested a long
> time
> > > > ago:
> > > > > > the
> > > > > > > > > Flink
> > > > > > > > > > > REST
> > > > > > > > > > > >> >> >>>> client
> > > > > > > > > > > >> >> >>>>>>> should not require any Flink dependency,
> > only
> > > > > http
> > > > > > > > > library
> > > > > > > > > > to
> > > > > > > > > > > >> >> >> call
> > > > > > > > > > > >> >> >>>> the
> > > > > > > > > > > >> >> >>>>>> REST
> > > > > > > > > > > >> >> >>>>>>> services to submit and monitor a job.
> > > > > > > > > > > >> >> >>>>>>> What I suggested also in [1] was to have
> a
> > way
> > > > to
> > > > > > > > > > > automatically
> > > > > > > > > > > >> >> >>>> suggest
> > > > > > > > > > > >> >> >>>>>> the
> > > > > > > > > > > >> >> >>>>>>> user (via a UI) the available main
> classes
> > and
> > > > > > their
> > > > > > > > > > required
> > > > > > > > > > > >> >> >>>>>>> parameters[2].
> > > > > > > > > > > >> >> >>>>>>> Another problem we have with Flink is
> that
> > the
> > > > > Rest
> > > > > > > > > client
> > > > > > > > > > > and
> > > > > > > > > > > >> >> >> the
> > > > > > > > > > > >> >> >>>> CLI
> > > > > > > > > > > >> >> >>>>>> one
> > > > > > > > > > > >> >> >>>>>>> behaves differently and we use the CLI
> > client
> > > > > (via
> > > > > > ssh)
> > > > > > > > > > > because
> > > > > > > > > > > >> >> >> it
> > > > > > > > > > > >> >> >>>>> allows
> > > > > > > > > > > >> >> >>>>>>> to call some other method after
> > env.execute()
> > > > [3]
> > > > > > (we
> > > > > > > > > have
> > > > > > > > > > to
> > > > > > > > > > > >> >> >> call
> > > > > > > > > > > >> >> >>>>>> another
> > > > > > > > > > > >> >> >>>>>>> REST service to signal the end of the
> job).
> > > > > > > > > > > >> >> >>>>>>> Int his regard, a dedicated interface,
> > like the
> > > > > > > > > JobListener
> > > > > > > > > > > >> >> >>> suggested
> > > > > > > > > > > >> >> >>>>> in
> > > > > > > > > > > >> >> >>>>>>> the previous emails, would be very
> helpful
> > > > > (IMHO).
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>> [1]
> > > > > > https://issues.apache.org/jira/browse/FLINK-10864
> > > > > > > > > > > >> >> >>>>>>> [2]
> > > > > > https://issues.apache.org/jira/browse/FLINK-10862
> > > > > > > > > > > >> >> >>>>>>> [3]
> > > > > > https://issues.apache.org/jira/browse/FLINK-10879
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>> Best,
> > > > > > > > > > > >> >> >>>>>>> Flavio
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>> On Mon, Jun 24, 2019 at 9:54 AM Jeff
> Zhang
> > <
> > > > > > > > > > zjf...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > >> >> >>> wrote:
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> Hi, Tison,
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> Thanks for your comments. Overall I
> agree
> > with
> > > > > you
> > > > > > > > that
> > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > >> >> >>>>> difficult
> > > > > > > > > > > >> >> >>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>> down stream project to integrate with
> > flink
> > > > and
> > > > > we
> > > > > > > > need
> > > > > > > > > to
> > > > > > > > > > > >> >> >>> refactor
> > > > > > > > > > > >> >> >>>>> the
> > > > > > > > > > > >> >> >>>>>>>> current flink client api.
> > > > > > > > > > > >> >> >>>>>>>> And I agree that CliFrontend should only
> > > > parsing
> > > > > > > > command
> > > > > > > > > > > line
> > > > > > > > > > > >> >> >>>>> arguments
> > > > > > > > > > > >> >> >>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>> then pass them to ExecutionEnvironment.
> > It is
> > > > > > > > > > > >> >> >>>> ExecutionEnvironment's
> > > > > > > > > > > >> >> >>>>>>>> responsibility to compile job, create
> > cluster,
> > > > > and
> > > > > > > > > submit
> > > > > > > > > > > job.
> > > > > > > > > > > >> >> >>>>> Besides
> > > > > > > > > > > >> >> >>>>>>>> that, Currently flink has many
> > > > > > ExecutionEnvironment
> > > > > > > > > > > >> >> >>>> implementations,
> > > > > > > > > > > >> >> >>>>>> and
> > > > > > > > > > > >> >> >>>>>>>> flink will use the specific one based on
> > the
> > > > > > context.
> > > > > > > > > > IMHO,
> > > > > > > > > > > it
> > > > > > > > > > > >> >> >> is
> > > > > > > > > > > >> >> >>>> not
> > > > > > > > > > > >> >> >>>>>>>> necessary, ExecutionEnvironment should
> be
> > able
> > > > > to
> > > > > > do
> > > > > > > > the
> > > > > > > > > > > right
> > > > > > > > > > > >> >> >>>> thing
> > > > > > > > > > > >> >> >>>>>>> based
> > > > > > > > > > > >> >> >>>>>>>> on the FlinkConf it is received. Too
> many
> > > > > > > > > > > ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>>>> implementation is another burden for
> > > > downstream
> > > > > > > > project
> > > > > > > > > > > >> >> >>>> integration.
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> One thing I'd like to mention is flink's
> > scala
> > > > > > shell
> > > > > > > > and
> > > > > > > > > > sql
> > > > > > > > > > > >> >> >>>> client,
> > > > > > > > > > > >> >> >>>>>>>> although they are sub-modules of flink,
> > they
> > > > > > could be
> > > > > > > > > > > treated
> > > > > > > > > > > >> >> >> as
> > > > > > > > > > > >> >> >>>>>>> downstream
> > > > > > > > > > > >> >> >>>>>>>> project which use flink's client api.
> > > > Currently
> > > > > > you
> > > > > > > > will
> > > > > > > > > > > find
> > > > > > > > > > > >> >> >> it
> > > > > > > > > > > >> >> >>> is
> > > > > > > > > > > >> >> >>>>> not
> > > > > > > > > > > >> >> >>>>>>>> easy for them to integrate with flink,
> > they
> > > > > share
> > > > > > many
> > > > > > > > > > > >> >> >> duplicated
> > > > > > > > > > > >> >> >>>>> code.
> > > > > > > > > > > >> >> >>>>>>> It
> > > > > > > > > > > >> >> >>>>>>>> is another sign that we should refactor
> > flink
> > > > > > client
> > > > > > > > > api.
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> I believe it is a large and hard change,
> > and I
> > > > > am
> > > > > > > > afraid
> > > > > > > > > > we
> > > > > > > > > > > >> can
> > > > > > > > > > > >> >> >>> not
> > > > > > > > > > > >> >> >>>>>> keep
> > > > > > > > > > > >> >> >>>>>>>> compatibility since many of changes are
> > user
> > > > > > facing.
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> Zili Chen <wander4...@gmail.com>
> > > > 于2019年6月24日周一
> > > > > > > > > 下午2:53写道:
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> Hi all,
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> After a closer look on our client apis,
> > I can
> > > > > see
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > > >> >> >> two
> > > > > > > > > > > >> >> >>>>> major
> > > > > > > > > > > >> >> >>>>>>>>> issues to consistency and integration,
> > namely
> > > > > > > > different
> > > > > > > > > > > >> >> >>>> deployment
> > > > > > > > > > > >> >> >>>>> of
> > > > > > > > > > > >> >> >>>>>>>>> job cluster which couples job graph
> > creation
> > > > > and
> > > > > > > > > cluster
> > > > > > > > > > > >> >> >>>>> deployment,
> > > > > > > > > > > >> >> >>>>>>>>> and submission via CliFrontend
> confusing
> > > > > control
> > > > > > flow
> > > > > > > > > of
> > > > > > > > > > > job
> > > > > > > > > > > >> >> >>>> graph
> > > > > > > > > > > >> >> >>>>>>>>> compilation and job submission. I'd
> like
> > to
> > > > > > follow
> > > > > > > > the
> > > > > > > > > > > >> >> >> discuss
> > > > > > > > > > > >> >> >>>>> above,
> > > > > > > > > > > >> >> >>>>>>>>> mainly the process described by Jeff
> and
> > > > > > Stephan, and
> > > > > > > > > > share
> > > > > > > > > > > >> >> >> my
> > > > > > > > > > > >> >> >>>>>>>>> ideas on these issues.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> 1) CliFrontend confuses the control
> flow
> > of
> > > > job
> > > > > > > > > > compilation
> > > > > > > > > > > >> >> >> and
> > > > > > > > > > > >> >> >>>>>>>> submission.
> > > > > > > > > > > >> >> >>>>>>>>> Following the process of job submission
> > > > Stephan
> > > > > > and
> > > > > > > > > Jeff
> > > > > > > > > > > >> >> >>>> described,
> > > > > > > > > > > >> >> >>>>>>>>> execution environment knows all configs
> > of
> > > > the
> > > > > > > > cluster
> > > > > > > > > > and
> > > > > > > > > > > >> >> >>>>>>> topos/settings
> > > > > > > > > > > >> >> >>>>>>>>> of the job. Ideally, in the main method
> > of
> > > > user
> > > > > > > > > program,
> > > > > > > > > > it
> > > > > > > > > > > >> >> >>> calls
> > > > > > > > > > > >> >> >>>>>>>> #execute
> > > > > > > > > > > >> >> >>>>>>>>> (or named #submit) and Flink deploys
> the
> > > > > cluster,
> > > > > > > > > compile
> > > > > > > > > > > the
> > > > > > > > > > > >> >> >>> job
> > > > > > > > > > > >> >> >>>>>> graph
> > > > > > > > > > > >> >> >>>>>>>>> and submit it to the cluster. However,
> > > > current
> > > > > > > > > > CliFrontend
> > > > > > > > > > > >> >> >> does
> > > > > > > > > > > >> >> >>>> all
> > > > > > > > > > > >> >> >>>>>>> these
> > > > > > > > > > > >> >> >>>>>>>>> things inside its #runProgram method,
> > which
> > > > > > > > introduces
> > > > > > > > > a
> > > > > > > > > > > lot
> > > > > > > > > > > >> >> >> of
> > > > > > > > > > > >> >> >>>>>>>> subclasses
> > > > > > > > > > > >> >> >>>>>>>>> of (stream) execution environment.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> Actually, it sets up an exec env that
> > hijacks
> > > > > the
> > > > > > > > > > > >> >> >>>>>> #execute/executePlan
> > > > > > > > > > > >> >> >>>>>>>>> method, initializes the job graph and
> > abort
> > > > > > > > execution.
> > > > > > > > > > And
> > > > > > > > > > > >> >> >> then
> > > > > > > > > > > >> >> >>>>>>>>> control flow back to CliFrontend, it
> > deploys
> > > > > the
> > > > > > > > > > cluster(or
> > > > > > > > > > > >> >> >>>>> retrieve
> > > > > > > > > > > >> >> >>>>>>>>> the client) and submits the job graph.
> > This
> > > > is
> > > > > > quite
> > > > > > > > a
> > > > > > > > > > > >> >> >> specific
> > > > > > > > > > > >> >> >>>>>>> internal
> > > > > > > > > > > >> >> >>>>>>>>> process inside Flink and none of
> > consistency
> > > > to
> > > > > > > > > anything.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> 2) Deployment of job cluster couples
> job
> > > > graph
> > > > > > > > creation
> > > > > > > > > > and
> > > > > > > > > > > >> >> >>>> cluster
> > > > > > > > > > > >> >> >>>>>>>>> deployment. Abstractly, from user job
> to
> > a
> > > > > > concrete
> > > > > > > > > > > >> >> >> submission,
> > > > > > > > > > > >> >> >>>> it
> > > > > > > > > > > >> >> >>>>>>>> requires
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>     create JobGraph --\
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> create ClusterClient -->  submit
> JobGraph
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> such a dependency. ClusterClient was
> > created
> > > > by
> > > > > > > > > deploying
> > > > > > > > > > > or
> > > > > > > > > > > >> >> >>>>>>> retrieving.
> > > > > > > > > > > >> >> >>>>>>>>> JobGraph submission requires a compiled
> > > > > JobGraph
> > > > > > and
> > > > > > > > > > valid
> > > > > > > > > > > >> >> >>>>>>> ClusterClient,
> > > > > > > > > > > >> >> >>>>>>>>> but the creation of ClusterClient is
> > > > abstractly
> > > > > > > > > > independent
> > > > > > > > > > > >> >> >> of
> > > > > > > > > > > >> >> >>>> that
> > > > > > > > > > > >> >> >>>>>> of
> > > > > > > > > > > >> >> >>>>>>>>> JobGraph. However, in job cluster mode,
> > we
> > > > > > deploy job
> > > > > > > > > > > cluster
> > > > > > > > > > > >> >> >>>> with
> > > > > > > > > > > >> >> >>>>> a
> > > > > > > > > > > >> >> >>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>> graph, which means we use another
> > process:
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> create JobGraph --> deploy cluster with
> > the
> > > > > > JobGraph
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> Here is another inconsistency and
> > downstream
> > > > > > > > > > > projects/client
> > > > > > > > > > > >> >> >>> apis
> > > > > > > > > > > >> >> >>>>> are
> > > > > > > > > > > >> >> >>>>>>>>> forced to handle different cases with
> > rare
> > > > > > supports
> > > > > > > > > from
> > > > > > > > > > > >> >> >> Flink.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> Since we likely reached a consensus on
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> 1. all configs gathered by Flink
> > > > configuration
> > > > > > and
> > > > > > > > > passed
> > > > > > > > > > > >> >> >>>>>>>>> 2. execution environment knows all
> > configs
> > > > and
> > > > > > > > handles
> > > > > > > > > > > >> >> >>>>> execution(both
> > > > > > > > > > > >> >> >>>>>>>>> deployment and submission)
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> to the issues above I propose
> eliminating
> > > > > > > > > inconsistencies
> > > > > > > > > > > by
> > > > > > > > > > > >> >> >>>>>> following
> > > > > > > > > > > >> >> >>>>>>>>> approach:
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> 1) CliFrontend should exactly be a
> front
> > end,
> > > > > at
> > > > > > > > least
> > > > > > > > > > for
> > > > > > > > > > > >> >> >>> "run"
> > > > > > > > > > > >> >> >>>>>>> command.
> > > > > > > > > > > >> >> >>>>>>>>> That means it just gathered and passed
> > all
> > > > > config
> > > > > > > > from
> > > > > > > > > > > >> >> >> command
> > > > > > > > > > > >> >> >>>> line
> > > > > > > > > > > >> >> >>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>> the main method of user program.
> > Execution
> > > > > > > > environment
> > > > > > > > > > > knows
> > > > > > > > > > > >> >> >>> all
> > > > > > > > > > > >> >> >>>>> the
> > > > > > > > > > > >> >> >>>>>>> info
> > > > > > > > > > > >> >> >>>>>>>>> and with an addition to utils for
> > > > > ClusterClient,
> > > > > > we
> > > > > > > > > > > >> >> >> gracefully
> > > > > > > > > > > >> >> >>>> get
> > > > > > > > > > > >> >> >>>>> a
> > > > > > > > > > > >> >> >>>>>>>>> ClusterClient by deploying or
> > retrieving. In
> > > > > this
> > > > > > > > way,
> > > > > > > > > we
> > > > > > > > > > > >> >> >> don't
> > > > > > > > > > > >> >> >>>>> need
> > > > > > > > > > > >> >> >>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>> hijack #execute/executePlan methods and
> > can
> > > > > > remove
> > > > > > > > > > various
> > > > > > > > > > > >> >> >>>> hacking
> > > > > > > > > > > >> >> >>>>>>>>> subclasses of exec env, as well as #run
> > > > methods
> > > > > > in
> > > > > > > > > > > >> >> >>>>> ClusterClient(for
> > > > > > > > > > > >> >> >>>>>> an
> > > > > > > > > > > >> >> >>>>>>>>> interface-ized ClusterClient). Now the
> > > > control
> > > > > > flow
> > > > > > > > > flows
> > > > > > > > > > > >> >> >> from
> > > > > > > > > > > >> >> >>>>>>>> CliFrontend
> > > > > > > > > > > >> >> >>>>>>>>> to the main method and never returns.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> 2) Job cluster means a cluster for the
> > > > specific
> > > > > > job.
> > > > > > > > > From
> > > > > > > > > > > >> >> >>> another
> > > > > > > > > > > >> >> >>>>>>>>> perspective, it is an ephemeral
> session.
> > We
> > > > may
> > > > > > > > > decouple
> > > > > > > > > > > the
> > > > > > > > > > > >> >> >>>>>> deployment
> > > > > > > > > > > >> >> >>>>>>>>> with a compiled job graph, but start a
> > > > session
> > > > > > with
> > > > > > > > > idle
> > > > > > > > > > > >> >> >>> timeout
> > > > > > > > > > > >> >> >>>>>>>>> and submit the job following.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> These topics, before we go into more
> > details
> > > > on
> > > > > > > > design
> > > > > > > > > or
> > > > > > > > > > > >> >> >>>>>>> implementation,
> > > > > > > > > > > >> >> >>>>>>>>> are better to be aware and discussed
> for
> > a
> > > > > > consensus.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> Best,
> > > > > > > > > > > >> >> >>>>>>>>> tison.
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>> Zili Chen <wander4...@gmail.com>
> > > > 于2019年6月20日周四
> > > > > > > > > 上午3:21写道:
> > > > > > > > > > > >> >> >>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> Hi Jeff,
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> Thanks for raising this thread and the
> > > > design
> > > > > > > > > document!
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> As @Thomas Weise mentioned above,
> > extending
> > > > > > config
> > > > > > > > to
> > > > > > > > > > > flink
> > > > > > > > > > > >> >> >>>>>>>>>> requires far more effort than it
> should
> > be.
> > > > > > Another
> > > > > > > > > > > example
> > > > > > > > > > > >> >> >>>>>>>>>> is we achieve detach mode by introduce
> > > > another
> > > > > > > > > execution
> > > > > > > > > > > >> >> >>>>>>>>>> environment which also hijack #execute
> > > > method.
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> I agree with your idea that user would
> > > > > > configure all
> > > > > > > > > > > things
> > > > > > > > > > > >> >> >>>>>>>>>> and flink "just" respect it. On this
> > topic I
> > > > > > think
> > > > > > > > the
> > > > > > > > > > > >> >> >> unusual
> > > > > > > > > > > >> >> >>>>>>>>>> control flow when CliFrontend handle
> > "run"
> > > > > > command
> > > > > > > > is
> > > > > > > > > > the
> > > > > > > > > > > >> >> >>>> problem.
> > > > > > > > > > > >> >> >>>>>>>>>> It handles several configs, mainly
> about
> > > > > cluster
> > > > > > > > > > settings,
> > > > > > > > > > > >> >> >> and
> > > > > > > > > > > >> >> >>>>>>>>>> thus main method of user program is
> > unaware
> > > > of
> > > > > > them.
> > > > > > > > > > Also
> > > > > > > > > > > it
> > > > > > > > > > > >> >> >>>>>> compiles
> > > > > > > > > > > >> >> >>>>>>>>>> app to job graph by run the main
> method
> > > > with a
> > > > > > > > > hijacked
> > > > > > > > > > > exec
> > > > > > > > > > > >> >> >>>> env,
> > > > > > > > > > > >> >> >>>>>>>>>> which constrain the main method
> further.
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> I'd like to write down a few of notes
> on
> > > > > > > > configs/args
> > > > > > > > > > pass
> > > > > > > > > > > >> >> >> and
> > > > > > > > > > > >> >> >>>>>>> respect,
> > > > > > > > > > > >> >> >>>>>>>>>> as well as decoupling job compilation
> > and
> > > > > > > > submission.
> > > > > > > > > > > Share
> > > > > > > > > > > >> >> >> on
> > > > > > > > > > > >> >> >>>>> this
> > > > > > > > > > > >> >> >>>>>>>>>> thread later.
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> Best,
> > > > > > > > > > > >> >> >>>>>>>>>> tison.
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com>
> > > > > > 于2019年6月17日周一
> > > > > > > > > > > >> >> >> 下午7:29写道:
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>> Hi Jeff and Flavio,
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>> Thanks Jeff a lot for proposing the
> > design
> > > > > > > > document.
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>> We are also working on refactoring
> > > > > > ClusterClient to
> > > > > > > > > > allow
> > > > > > > > > > > >> >> >>>>> flexible
> > > > > > > > > > > >> >> >>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>> efficient job management in our
> > real-time
> > > > > > platform.
> > > > > > > > > > > >> >> >>>>>>>>>>> We would like to draft a document to
> > share
> > > > > our
> > > > > > > > ideas
> > > > > > > > > > with
> > > > > > > > > > > >> >> >>> you.
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>> I think it's a good idea to have
> > something
> > > > > like
> > > > > > > > > Apache
> > > > > > > > > > > Livy
> > > > > > > > > > > >> >> >>> for
> > > > > > > > > > > >> >> >>>>>>> Flink,
> > > > > > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>> the efforts discussed here will take
> a
> > > > great
> > > > > > step
> > > > > > > > > > forward
> > > > > > > > > > > >> >> >> to
> > > > > > > > > > > >> >> >>>> it.
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>> Regards,
> > > > > > > > > > > >> >> >>>>>>>>>>> Xiaogang
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>> Flavio Pompermaier <
> > pomperma...@okkam.it>
> > > > > > > > > > 于2019年6月17日周一
> > > > > > > > > > > >> >> >>>>> 下午7:13写道:
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>> Is there any possibility to have
> > something
> > > > > > like
> > > > > > > > > Apache
> > > > > > > > > > > >> >> >> Livy
> > > > > > > > > > > >> >> >>>> [1]
> > > > > > > > > > > >> >> >>>>>>> also
> > > > > > > > > > > >> >> >>>>>>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>> Flink in the future?
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>> [1] https://livy.apache.org/
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>> On Tue, Jun 11, 2019 at 5:23 PM Jeff
> > > > Zhang <
> > > > > > > > > > > >> >> >>> zjf...@gmail.com
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Any API we expose should not
> have
> > > > > > dependencies
> > > > > > > > > on
> > > > > > > > > > > >> >> >>> the
> > > > > > > > > > > >> >> >>>>>>> runtime
> > > > > > > > > > > >> >> >>>>>>>>>>>>> (flink-runtime) package or other
> > > > > > implementation
> > > > > > > > > > > >> >> >> details.
> > > > > > > > > > > >> >> >>> To
> > > > > > > > > > > >> >> >>>>> me,
> > > > > > > > > > > >> >> >>>>>>>> this
> > > > > > > > > > > >> >> >>>>>>>>>>>> means
> > > > > > > > > > > >> >> >>>>>>>>>>>>> that the current ClusterClient
> > cannot be
> > > > > > exposed
> > > > > > > > to
> > > > > > > > > > > >> >> >> users
> > > > > > > > > > > >> >> >>>>>> because
> > > > > > > > > > > >> >> >>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>> uses
> > > > > > > > > > > >> >> >>>>>>>>>>>>> quite some classes from the
> > optimiser and
> > > > > > runtime
> > > > > > > > > > > >> >> >>> packages.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>> We should change ClusterClient from
> > class
> > > > > to
> > > > > > > > > > interface.
> > > > > > > > > > > >> >> >>>>>>>>>>>>> ExecutionEnvironment only use the
> > > > interface
> > > > > > > > > > > >> >> >> ClusterClient
> > > > > > > > > > > >> >> >>>>> which
> > > > > > > > > > > >> >> >>>>>>>>>>> should be
> > > > > > > > > > > >> >> >>>>>>>>>>>>> in flink-clients while the concrete
> > > > > > > > implementation
> > > > > > > > > > > >> >> >> class
> > > > > > > > > > > >> >> >>>>> could
> > > > > > > > > > > >> >> >>>>>> be
> > > > > > > > > > > >> >> >>>>>>>> in
> > > > > > > > > > > >> >> >>>>>>>>>>>>> flink-runtime.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> What happens when a
> > failure/restart in
> > > > > the
> > > > > > > > > client
> > > > > > > > > > > >> >> >>>>> happens?
> > > > > > > > > > > >> >> >>>>>>>> There
> > > > > > > > > > > >> >> >>>>>>>>>>> need
> > > > > > > > > > > >> >> >>>>>>>>>>>>> to be a way of re-establishing the
> > > > > > connection to
> > > > > > > > > the
> > > > > > > > > > > >> >> >> job,
> > > > > > > > > > > >> >> >>>> set
> > > > > > > > > > > >> >> >>>>>> up
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> listeners again, etc.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>> Good point.  First we need to
> define
> > what
> > > > > > does
> > > > > > > > > > > >> >> >>>>> failure/restart
> > > > > > > > > > > >> >> >>>>>> in
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> client mean. IIUC, that usually
> mean
> > > > > network
> > > > > > > > > failure
> > > > > > > > > > > >> >> >>> which
> > > > > > > > > > > >> >> >>>>> will
> > > > > > > > > > > >> >> >>>>>>>>>>> happen in
> > > > > > > > > > > >> >> >>>>>>>>>>>>> class RestClient. If my
> > understanding is
> > > > > > correct,
> > > > > > > > > > > >> >> >>>>> restart/retry
> > > > > > > > > > > >> >> >>>>>>>>>>> mechanism
> > > > > > > > > > > >> >> >>>>>>>>>>>>> should be done in RestClient.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>> Aljoscha Krettek <
> > aljos...@apache.org>
> > > > > > > > > 于2019年6月11日周二
> > > > > > > > > > > >> >> >>>>>> 下午11:10写道:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> Some points to consider:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> * Any API we expose should not
> have
> > > > > > dependencies
> > > > > > > > > on
> > > > > > > > > > > >> >> >> the
> > > > > > > > > > > >> >> >>>>>> runtime
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> (flink-runtime) package or other
> > > > > > implementation
> > > > > > > > > > > >> >> >>> details.
> > > > > > > > > > > >> >> >>>> To
> > > > > > > > > > > >> >> >>>>>> me,
> > > > > > > > > > > >> >> >>>>>>>>>>> this
> > > > > > > > > > > >> >> >>>>>>>>>>>>> means
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> that the current ClusterClient
> > cannot be
> > > > > > exposed
> > > > > > > > > to
> > > > > > > > > > > >> >> >>> users
> > > > > > > > > > > >> >> >>>>>>> because
> > > > > > > > > > > >> >> >>>>>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>>> uses
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> quite some classes from the
> > optimiser
> > > > and
> > > > > > > > runtime
> > > > > > > > > > > >> >> >>>> packages.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> * What happens when a
> > failure/restart in
> > > > > the
> > > > > > > > > client
> > > > > > > > > > > >> >> >>>>> happens?
> > > > > > > > > > > >> >> >>>>>>>> There
> > > > > > > > > > > >> >> >>>>>>>>>>> need
> > > > > > > > > > > >> >> >>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> be a way of re-establishing the
> > > > connection
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > >> >> >> job,
> > > > > > > > > > > >> >> >>>> set
> > > > > > > > > > > >> >> >>>>> up
> > > > > > > > > > > >> >> >>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> listeners
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> again, etc.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> Aljoscha
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> On 29. May 2019, at 10:17, Jeff
> > Zhang <
> > > > > > > > > > > >> >> >>>> zjf...@gmail.com>
> > > > > > > > > > > >> >> >>>>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Sorry folks, the design doc is
> > late as
> > > > > you
> > > > > > > > > > > >> >> >> expected.
> > > > > > > > > > > >> >> >>>>> Here's
> > > > > > > > > > > >> >> >>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>> design
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> doc
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> I drafted, welcome any comments
> and
> > > > > > feedback.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Stephan Ewen <se...@apache.org>
> > > > > > 于2019年2月14日周四
> > > > > > > > > > > >> >> >>>> 下午8:43写道:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Nice that this discussion is
> > > > happening.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> In the FLIP, we could also
> > revisit the
> > > > > > entire
> > > > > > > > > role
> > > > > > > > > > > >> >> >>> of
> > > > > > > > > > > >> >> >>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>> environments
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> again.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Initially, the idea was:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - the environments take care of
> > the
> > > > > > specific
> > > > > > > > > > > >> >> >> setup
> > > > > > > > > > > >> >> >>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>> standalone
> > > > > > > > > > > >> >> >>>>>>>>>>>> (no
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> setup needed), yarn, mesos, etc.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - the session ones have control
> > over
> > > > the
> > > > > > > > > session.
> > > > > > > > > > > >> >> >>> The
> > > > > > > > > > > >> >> >>>>>>>>>>> environment
> > > > > > > > > > > >> >> >>>>>>>>>>>>> holds
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the session client.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - running a job gives a
> "control"
> > > > object
> > > > > > for
> > > > > > > > > that
> > > > > > > > > > > >> >> >>>> job.
> > > > > > > > > > > >> >> >>>>>> That
> > > > > > > > > > > >> >> >>>>>>>>>>>> behavior
> > > > > > > > > > > >> >> >>>>>>>>>>>>> is
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the same in all environments.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> The actual implementation
> diverged
> > > > quite
> > > > > > a bit
> > > > > > > > > > > >> >> >> from
> > > > > > > > > > > >> >> >>>>> that.
> > > > > > > > > > > >> >> >>>>>>>> Happy
> > > > > > > > > > > >> >> >>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>> see a
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> discussion about straitening
> this
> > out
> > > > a
> > > > > > bit
> > > > > > > > > more.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> On Tue, Feb 12, 2019 at 4:58 AM
> > Jeff
> > > > > > Zhang <
> > > > > > > > > > > >> >> >>>>>>> zjf...@gmail.com>
> > > > > > > > > > > >> >> >>>>>>>>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Hi folks,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Sorry for late response, It
> > seems we
> > > > > > reach
> > > > > > > > > > > >> >> >>> consensus
> > > > > > > > > > > >> >> >>>> on
> > > > > > > > > > > >> >> >>>>>>>> this, I
> > > > > > > > > > > >> >> >>>>>>>>>>>> will
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> create
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> FLIP for this with more
> detailed
> > > > design
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Thomas Weise <t...@apache.org>
> > > > > > 于2018年12月21日周五
> > > > > > > > > > > >> >> >>>>> 上午11:43写道:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Great to see this discussion
> > seeded!
> > > > > The
> > > > > > > > > > > >> >> >> problems
> > > > > > > > > > > >> >> >>>> you
> > > > > > > > > > > >> >> >>>>>> face
> > > > > > > > > > > >> >> >>>>>>>>>>> with
> > > > > > > > > > > >> >> >>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Zeppelin integration are also
> > > > > affecting
> > > > > > > > other
> > > > > > > > > > > >> >> >>>>> downstream
> > > > > > > > > > > >> >> >>>>>>>>>>> projects,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> like
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Beam.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> We just enabled the savepoint
> > > > restore
> > > > > > option
> > > > > > > > > in
> > > > > > > > > > > >> >> >>>>>>>>>>>>>> RemoteStreamEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> [1]
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and that was more difficult
> > than it
> > > > > > should
> > > > > > > > be.
> > > > > > > > > > > >> >> >> The
> > > > > > > > > > > >> >> >>>>> main
> > > > > > > > > > > >> >> >>>>>>>> issue
> > > > > > > > > > > >> >> >>>>>>>>>>> is
> > > > > > > > > > > >> >> >>>>>>>>>>>>> that
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> environment and cluster client
> > > > aren't
> > > > > > > > > decoupled.
> > > > > > > > > > > >> >> >>>>> Ideally
> > > > > > > > > > > >> >> >>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>> should
> > > > > > > > > > > >> >> >>>>>>>>>>>>> be
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> possible to just get the
> > matching
> > > > > > cluster
> > > > > > > > > client
> > > > > > > > > > > >> >> >>>> from
> > > > > > > > > > > >> >> >>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> environment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> then control the job through
> it
> > > > > > (environment
> > > > > > > > > as
> > > > > > > > > > > >> >> >>>>> factory
> > > > > > > > > > > >> >> >>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>> cluster
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> client). But note that the
> > > > environment
> > > > > > > > classes
> > > > > > > > > > > >> >> >> are
> > > > > > > > > > > >> >> >>>>> part
> > > > > > > > > > > >> >> >>>>>> of
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> public
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> API,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and it is not straightforward
> to
> > > > make
> > > > > > larger
> > > > > > > > > > > >> >> >>> changes
> > > > > > > > > > > >> >> >>>>>>> without
> > > > > > > > > > > >> >> >>>>>>>>>>>>> breaking
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> backward compatibility.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterClient currently
> exposes
> > > > > internal
> > > > > > > > > classes
> > > > > > > > > > > >> >> >>>> like
> > > > > > > > > > > >> >> >>>>>>>>>>> JobGraph and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> StreamGraph. But it should be
> > > > possible
> > > > > > to
> > > > > > > > wrap
> > > > > > > > > > > >> >> >>> this
> > > > > > > > > > > >> >> >>>>>> with a
> > > > > > > > > > > >> >> >>>>>>>> new
> > > > > > > > > > > >> >> >>>>>>>>>>>>> public
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> API
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> that brings the required job
> > control
> > > > > > > > > > > >> >> >> capabilities
> > > > > > > > > > > >> >> >>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>> downstream
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> projects.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Perhaps it is helpful to look
> at
> > > > some
> > > > > > of the
> > > > > > > > > > > >> >> >>>>> interfaces
> > > > > > > > > > > >> >> >>>>>> in
> > > > > > > > > > > >> >> >>>>>>>>>>> Beam
> > > > > > > > > > > >> >> >>>>>>>>>>>>> while
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> thinking about this: [2] for
> the
> > > > > > portable
> > > > > > > > job
> > > > > > > > > > > >> >> >> API
> > > > > > > > > > > >> >> >>>> and
> > > > > > > > > > > >> >> >>>>>> [3]
> > > > > > > > > > > >> >> >>>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> old
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> asynchronous job control from
> > the
> > > > Beam
> > > > > > Java
> > > > > > > > > SDK.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> The backward compatibility
> > > > discussion
> > > > > > [4] is
> > > > > > > > > > > >> >> >> also
> > > > > > > > > > > >> >> >>>>>> relevant
> > > > > > > > > > > >> >> >>>>>>>>>>> here. A
> > > > > > > > > > > >> >> >>>>>>>>>>>>> new
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> API
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> should shield downstream
> > projects
> > > > from
> > > > > > > > > internals
> > > > > > > > > > > >> >> >>> and
> > > > > > > > > > > >> >> >>>>>> allow
> > > > > > > > > > > >> >> >>>>>>>>>>> them to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> interoperate with multiple
> > future
> > > > > Flink
> > > > > > > > > versions
> > > > > > > > > > > >> >> >>> in
> > > > > > > > > > > >> >> >>>>> the
> > > > > > > > > > > >> >> >>>>>>> same
> > > > > > > > > > > >> >> >>>>>>>>>>>> release
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> line
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> without forced upgrades.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thanks,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thomas
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [1]
> > > > > > > > https://github.com/apache/flink/pull/7249
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [2]
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [3]
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [4]
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> On Thu, Dec 20, 2018 at 6:15
> PM
> > Jeff
> > > > > > Zhang <
> > > > > > > > > > > >> >> >>>>>>>> zjf...@gmail.com>
> > > > > > > > > > > >> >> >>>>>>>>>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'm not so sure whether
> the
> > user
> > > > > > should
> > > > > > > > be
> > > > > > > > > > > >> >> >>> able
> > > > > > > > > > > >> >> >>>> to
> > > > > > > > > > > >> >> >>>>>>>> define
> > > > > > > > > > > >> >> >>>>>>>>>>>> where
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> runs (in your example Yarn).
> > This
> > > > is
> > > > > > > > actually
> > > > > > > > > > > >> >> >>>>>> independent
> > > > > > > > > > > >> >> >>>>>>>> of
> > > > > > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> development and is something
> > which
> > > > is
> > > > > > > > decided
> > > > > > > > > > > >> >> >> at
> > > > > > > > > > > >> >> >>>>>>> deployment
> > > > > > > > > > > >> >> >>>>>>>>>>> time.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> User don't need to specify
> > > > execution
> > > > > > mode
> > > > > > > > > > > >> >> >>>>>>> programmatically.
> > > > > > > > > > > >> >> >>>>>>>>>>> They
> > > > > > > > > > > >> >> >>>>>>>>>>>>> can
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> also
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass the execution mode from
> > the
> > > > > > arguments
> > > > > > > > in
> > > > > > > > > > > >> >> >>> flink
> > > > > > > > > > > >> >> >>>>> run
> > > > > > > > > > > >> >> >>>>>>>>>>> command.
> > > > > > > > > > > >> >> >>>>>>>>>>>>> e.g.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m yarn-cluster
> > ....
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m local ...
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m host:port
> ...
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Does this make sense to you ?
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> To me it makes sense that
> > the
> > > > > > > > > > > >> >> >>>> ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>> is
> > > > > > > > > > > >> >> >>>>>>>> not
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> directly
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> initialized by the user and
> > instead
> > > > > > context
> > > > > > > > > > > >> >> >>>> sensitive
> > > > > > > > > > > >> >> >>>>>> how
> > > > > > > > > > > >> >> >>>>>>>> you
> > > > > > > > > > > >> >> >>>>>>>>>>>> want
> > > > > > > > > > > >> >> >>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> execute your job (Flink CLI
> vs.
> > > > IDE,
> > > > > > for
> > > > > > > > > > > >> >> >>> example).
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Right, currently I notice
> Flink
> > > > would
> > > > > > > > create
> > > > > > > > > > > >> >> >>>>> different
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ContextExecutionEnvironment
> > based
> > > > on
> > > > > > > > > different
> > > > > > > > > > > >> >> >>>>>> submission
> > > > > > > > > > > >> >> >>>>>>>>>>>> scenarios
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> (Flink
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Cli vs IDE). To me this is
> > kind of
> > > > > hack
> > > > > > > > > > > >> >> >> approach,
> > > > > > > > > > > >> >> >>>> not
> > > > > > > > > > > >> >> >>>>>> so
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> straightforward.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> What I suggested above is
> that
> > is
> > > > > that
> > > > > > > > flink
> > > > > > > > > > > >> >> >>> should
> > > > > > > > > > > >> >> >>>>>>> always
> > > > > > > > > > > >> >> >>>>>>>>>>> create
> > > > > > > > > > > >> >> >>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> same
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ExecutionEnvironment but with
> > > > > different
> > > > > > > > > > > >> >> >>>>> configuration,
> > > > > > > > > > > >> >> >>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>> based
> > > > > > > > > > > >> >> >>>>>>>>>>>> on
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> configuration it would create
> > the
> > > > > > proper
> > > > > > > > > > > >> >> >>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>>> different
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> behaviors.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Till Rohrmann <
> > > > trohrm...@apache.org>
> > > > > > > > > > > >> >> >>>> 于2018年12月20日周四
> > > > > > > > > > > >> >> >>>>>>>>>>> 下午11:18写道:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> You are probably right that
> we
> > > > have
> > > > > > code
> > > > > > > > > > > >> >> >>>> duplication
> > > > > > > > > > > >> >> >>>>>>> when
> > > > > > > > > > > >> >> >>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>> comes
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creation of the
> ClusterClient.
> > > > This
> > > > > > should
> > > > > > > > > be
> > > > > > > > > > > >> >> >>>>> reduced
> > > > > > > > > > > >> >> >>>>>> in
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> future.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I'm not so sure whether the
> > user
> > > > > > should be
> > > > > > > > > > > >> >> >> able
> > > > > > > > > > > >> >> >>> to
> > > > > > > > > > > >> >> >>>>>>> define
> > > > > > > > > > > >> >> >>>>>>>>>>> where
> > > > > > > > > > > >> >> >>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> runs (in your example Yarn).
> > This
> > > > is
> > > > > > > > > actually
> > > > > > > > > > > >> >> >>>>>>> independent
> > > > > > > > > > > >> >> >>>>>>>>>>> of the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> development and is something
> > which
> > > > > is
> > > > > > > > > decided
> > > > > > > > > > > >> >> >> at
> > > > > > > > > > > >> >> >>>>>>>> deployment
> > > > > > > > > > > >> >> >>>>>>>>>>>> time.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> To
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> me
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> makes sense that the
> > > > > > ExecutionEnvironment
> > > > > > > > is
> > > > > > > > > > > >> >> >> not
> > > > > > > > > > > >> >> >>>>>>> directly
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> initialized
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> by
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> the user and instead context
> > > > > > sensitive how
> > > > > > > > > you
> > > > > > > > > > > >> >> >>>> want
> > > > > > > > > > > >> >> >>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>> execute
> > > > > > > > > > > >> >> >>>>>>>>>>>>> your
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> (Flink CLI vs. IDE, for
> > example).
> > > > > > > > However, I
> > > > > > > > > > > >> >> >>> agree
> > > > > > > > > > > >> >> >>>>>> that
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ExecutionEnvironment should
> > give
> > > > you
> > > > > > > > access
> > > > > > > > > to
> > > > > > > > > > > >> >> >>> the
> > > > > > > > > > > >> >> >>>>>>>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job (maybe in the form of
> the
> > > > > > JobGraph or
> > > > > > > > a
> > > > > > > > > > > >> >> >> job
> > > > > > > > > > > >> >> >>>>> plan).
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Cheers,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Till
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 4:36
> > AM
> > > > Jeff
> > > > > > > > Zhang <
> > > > > > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi Till,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback.
> You
> > are
> > > > > > right
> > > > > > > > > that I
> > > > > > > > > > > >> >> >>>>> expect
> > > > > > > > > > > >> >> >>>>>>>> better
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job submission/control api
> > which
> > > > > > could be
> > > > > > > > > > > >> >> >> used
> > > > > > > > > > > >> >> >>> by
> > > > > > > > > > > >> >> >>>>>>>>>>> downstream
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> project.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> And
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it would benefit for the
> > flink
> > > > > > ecosystem.
> > > > > > > > > > > >> >> >> When
> > > > > > > > > > > >> >> >>> I
> > > > > > > > > > > >> >> >>>>> look
> > > > > > > > > > > >> >> >>>>>>> at
> > > > > > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>> code
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> of
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> flink
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> scala-shell and sql-client
> (I
> > > > > believe
> > > > > > > > they
> > > > > > > > > > > >> >> >> are
> > > > > > > > > > > >> >> >>>> not
> > > > > > > > > > > >> >> >>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>> core of
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> flink,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> but
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> belong to the ecosystem of
> > > > flink),
> > > > > I
> > > > > > find
> > > > > > > > > > > >> >> >> many
> > > > > > > > > > > >> >> >>>>>>> duplicated
> > > > > > > > > > > >> >> >>>>>>>>>>> code
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creating
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ClusterClient from user
> > provided
> > > > > > > > > > > >> >> >> configuration
> > > > > > > > > > > >> >> >>>>>>>>>>> (configuration
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> format
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> may
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> different from scala-shell
> > and
> > > > > > > > sql-client)
> > > > > > > > > > > >> >> >> and
> > > > > > > > > > > >> >> >>>> then
> > > > > > > > > > > >> >> >>>>>> use
> > > > > > > > > > > >> >> >>>>>>>>>>> that
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to manipulate jobs. I don't
> > think
> > > > > > this is
> > > > > > > > > > > >> >> >>>>> convenient
> > > > > > > > > > > >> >> >>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> downstream
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> projects. What I expect is
> > that
> > > > > > > > downstream
> > > > > > > > > > > >> >> >>>> project
> > > > > > > > > > > >> >> >>>>>> only
> > > > > > > > > > > >> >> >>>>>>>>>>> needs
> > > > > > > > > > > >> >> >>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> provide
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> necessary configuration
> info
> > > > (maybe
> > > > > > > > > > > >> >> >> introducing
> > > > > > > > > > > >> >> >>>>> class
> > > > > > > > > > > >> >> >>>>>>>>>>>> FlinkConf),
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> build ExecutionEnvironment
> > based
> > > > on
> > > > > > this
> > > > > > > > > > > >> >> >>>> FlinkConf,
> > > > > > > > > > > >> >> >>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will
> > create
> > > > > the
> > > > > > > > proper
> > > > > > > > > > > >> >> >>>>>>>> ClusterClient.
> > > > > > > > > > > >> >> >>>>>>>>>>> It
> > > > > > > > > > > >> >> >>>>>>>>>>>> not
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> only
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> benefit for the downstream
> > > > project
> > > > > > > > > > > >> >> >> development
> > > > > > > > > > > >> >> >>>> but
> > > > > > > > > > > >> >> >>>>>> also
> > > > > > > > > > > >> >> >>>>>>>> be
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> helpful
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> their integration test with
> > > > flink.
> > > > > > Here's
> > > > > > > > > one
> > > > > > > > > > > >> >> >>>>> sample
> > > > > > > > > > > >> >> >>>>>>> code
> > > > > > > > > > > >> >> >>>>>>>>>>>> snippet
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> that
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> I
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> expect.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val conf = new
> > > > > > FlinkConf().mode("yarn")
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val env = new
> > > > > > ExecutionEnvironment(conf)
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobId = env.submit(...)
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobStatus =
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > env.getClusterClient().queryJobStatus(jobId)
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > env.getClusterClient().cancelJob(jobId)
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> What do you think ?
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Till Rohrmann <
> > > > > trohrm...@apache.org>
> > > > > > > > > > > >> >> >>>>> 于2018年12月11日周二
> > > > > > > > > > > >> >> >>>>>>>>>>> 下午6:28写道:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi Jeff,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> what you are proposing is
> to
> > > > > > provide the
> > > > > > > > > > > >> >> >> user
> > > > > > > > > > > >> >> >>>> with
> > > > > > > > > > > >> >> >>>>>>>> better
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> control. There was
> actually
> > an
> > > > > > effort to
> > > > > > > > > > > >> >> >>> achieve
> > > > > > > > > > > >> >> >>>>>> this
> > > > > > > > > > > >> >> >>>>>>>> but
> > > > > > > > > > > >> >> >>>>>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> has
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> never
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> been
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> completed [1]. However,
> > there
> > > > are
> > > > > > some
> > > > > > > > > > > >> >> >>>> improvement
> > > > > > > > > > > >> >> >>>>>> in
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>> code
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> base
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> now.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Look for example at the
> > > > > > NewClusterClient
> > > > > > > > > > > >> >> >>>> interface
> > > > > > > > > > > >> >> >>>>>>> which
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> offers a
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> non-blocking job
> submission.
> > > > But I
> > > > > > agree
> > > > > > > > > > > >> >> >> that
> > > > > > > > > > > >> >> >>> we
> > > > > > > > > > > >> >> >>>>>> need
> > > > > > > > > > > >> >> >>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> improve
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> in
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> this regard.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I would not be in favour
> if
> > > > > > exposing all
> > > > > > > > > > > >> >> >>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>> calls
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> via
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment
> > because it
> > > > > > would
> > > > > > > > > > > >> >> >> clutter
> > > > > > > > > > > >> >> >>>> the
> > > > > > > > > > > >> >> >>>>>>> class
> > > > > > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> would
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> not
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> a
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> good separation of
> concerns.
> > > > > > Instead one
> > > > > > > > > > > >> >> >> idea
> > > > > > > > > > > >> >> >>>>> could
> > > > > > > > > > > >> >> >>>>>> be
> > > > > > > > > > > >> >> >>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> retrieve
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> current ClusterClient from
> > the
> > > > > > > > > > > >> >> >>>>> ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>>>> which
> > > > > > > > > > > >> >> >>>>>>>>>>> can
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> be
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> used
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> for cluster and job
> > control. But
> > > > > > before
> > > > > > > > we
> > > > > > > > > > > >> >> >>> start
> > > > > > > > > > > >> >> >>>>> an
> > > > > > > > > > > >> >> >>>>>>>> effort
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> here,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> we
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> need
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> agree and capture what
> > > > > > functionality we
> > > > > > > > > want
> > > > > > > > > > > >> >> >>> to
> > > > > > > > > > > >> >> >>>>>>> provide.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Initially, the idea was
> > that we
> > > > > > have the
> > > > > > > > > > > >> >> >>>>>>>> ClusterDescriptor
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> describing
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> how
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> to talk to cluster manager
> > like
> > > > > > Yarn or
> > > > > > > > > > > >> >> >> Mesos.
> > > > > > > > > > > >> >> >>>> The
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterDescriptor
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> can
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> be
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> used for deploying Flink
> > > > clusters
> > > > > > (job
> > > > > > > > and
> > > > > > > > > > > >> >> >>>>> session)
> > > > > > > > > > > >> >> >>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>> gives
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> you a
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ClusterClient. The
> > ClusterClient
> > > > > > > > controls
> > > > > > > > > > > >> >> >> the
> > > > > > > > > > > >> >> >>>>>> cluster
> > > > > > > > > > > >> >> >>>>>>>>>>> (e.g.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> submitting
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> jobs, listing all running
> > jobs).
> > > > > And
> > > > > > > > then
> > > > > > > > > > > >> >> >>> there
> > > > > > > > > > > >> >> >>>>> was
> > > > > > > > > > > >> >> >>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>> idea
> > > > > > > > > > > >> >> >>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> introduce a
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> JobClient which you obtain
> > from
> > > > > the
> > > > > > > > > > > >> >> >>>> ClusterClient
> > > > > > > > > > > >> >> >>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>> trigger
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> specific
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> operations (e.g. taking a
> > > > > savepoint,
> > > > > > > > > > > >> >> >>> cancelling
> > > > > > > > > > > >> >> >>>>> the
> > > > > > > > > > > >> >> >>>>>>>> job).
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> [1]
> > > > > > > > > > > >> >> >>>>>>
> > > > https://issues.apache.org/jira/browse/FLINK-4272
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Till
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at
> > 10:13 AM
> > > > > > Jeff
> > > > > > > > > Zhang
> > > > > > > > > > > >> >> >> <
> > > > > > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> wrote:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Folks,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I am trying to integrate
> > flink
> > > > > into
> > > > > > > > > apache
> > > > > > > > > > > >> >> >>>>> zeppelin
> > > > > > > > > > > >> >> >>>>>>>>>>> which is
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> an
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> interactive
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> notebook. And I hit
> several
> > > > > issues
> > > > > > that
> > > > > > > > > is
> > > > > > > > > > > >> >> >>>> caused
> > > > > > > > > > > >> >> >>>>>> by
> > > > > > > > > > > >> >> >>>>>>>>>>> flink
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> client
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> api.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> So
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I'd like to proposal the
> > > > > following
> > > > > > > > > changes
> > > > > > > > > > > >> >> >>> for
> > > > > > > > > > > >> >> >>>>>> flink
> > > > > > > > > > > >> >> >>>>>>>>>>> client
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> api.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1. Support nonblocking
> > > > execution.
> > > > > > > > > > > >> >> >> Currently,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> ExecutionEnvironment#execute
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is a blocking method
> which
> > > > would
> > > > > > do 2
> > > > > > > > > > > >> >> >> things,
> > > > > > > > > > > >> >> >>>>> first
> > > > > > > > > > > >> >> >>>>>>>>>>> submit
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wait for job until it is
> > > > > finished.
> > > > > > I'd
> > > > > > > > > like
> > > > > > > > > > > >> >> >>>>>>> introduce a
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> nonblocking
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution method like
> > > > > > > > > > > >> >> >>>> ExecutionEnvironment#submit
> > > > > > > > > > > >> >> >>>>>>> which
> > > > > > > > > > > >> >> >>>>>>>>>>> only
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> submit
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> then return jobId to
> > client.
> > > > And
> > > > > > allow
> > > > > > > > > user
> > > > > > > > > > > >> >> >>> to
> > > > > > > > > > > >> >> >>>>>> query
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> status
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> via
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 2. Add cancel api in
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>
> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> currently the only way to
> > > > cancel
> > > > > > job is
> > > > > > > > > via
> > > > > > > > > > > >> >> >>> cli
> > > > > > > > > > > >> >> >>>>>>>>>>> (bin/flink),
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> this
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> is
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> not
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> convenient for downstream
> > > > project
> > > > > > to
> > > > > > > > use
> > > > > > > > > > > >> >> >> this
> > > > > > > > > > > >> >> >>>>>>> feature.
> > > > > > > > > > > >> >> >>>>>>>>>>> So I'd
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> like
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> add
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cancel api in
> > > > > ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 3. Add savepoint api in
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>
> > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> It
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is similar as cancel api,
> > we
> > > > > > should use
> > > > > > > > > > > >> >> >>>>>>>>>>> ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> as
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> unified
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> api for third party to
> > > > integrate
> > > > > > with
> > > > > > > > > > > >> >> >> flink.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 4. Add listener for job
> > > > execution
> > > > > > > > > > > >> >> >> lifecycle.
> > > > > > > > > > > >> >> >>>>>>> Something
> > > > > > > > > > > >> >> >>>>>>>>>>> like
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> following,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> so
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that downstream project
> > can do
> > > > > > custom
> > > > > > > > > logic
> > > > > > > > > > > >> >> >>> in
> > > > > > > > > > > >> >> >>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>> lifecycle
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> of
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> job.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> e.g.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Zeppelin would capture
> the
> > > > jobId
> > > > > > after
> > > > > > > > > job
> > > > > > > > > > > >> >> >> is
> > > > > > > > > > > >> >> >>>>>>> submitted
> > > > > > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> use
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId to cancel it later
> > when
> > > > > > > > necessary.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> public interface
> > JobListener {
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void
> onJobSubmitted(JobID
> > > > > jobId);
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void
> > > > > > onJobExecuted(JobExecutionResult
> > > > > > > > > > > >> >> >>>>> jobResult);
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobCanceled(JobID
> > > > jobId);
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> }
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 5. Enable session in
> > > > > > > > > ExecutionEnvironment.
> > > > > > > > > > > >> >> >>>>>> Currently
> > > > > > > > > > > >> >> >>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>> is
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> disabled,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> but
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> session is very
> convenient
> > for
> > > > > > third
> > > > > > > > > party
> > > > > > > > > > > >> >> >> to
> > > > > > > > > > > >> >> >>>>>>>> submitting
> > > > > > > > > > > >> >> >>>>>>>>>>> jobs
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> continually.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I hope flink can enable
> it
> > > > again.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6. Unify all flink client
> > api
> > > > > into
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>
> > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a long term issue
> > which
> > > > > > needs
> > > > > > > > > more
> > > > > > > > > > > >> >> >>>>> careful
> > > > > > > > > > > >> >> >>>>>>>>>>> thinking
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> design.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Currently some of
> features
> > of
> > > > > > flink is
> > > > > > > > > > > >> >> >>> exposed
> > > > > > > > > > > >> >> >>>> in
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>
> > ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > > > > > >> >> >>>>>> but
> > > > > > > > > > > >> >> >>>>>>>>>>> some are
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> exposed
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> in
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cli instead of api, like
> > the
> > > > > > cancel and
> > > > > > > > > > > >> >> >>>>> savepoint I
> > > > > > > > > > > >> >> >>>>>>>>>>> mentioned
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> above.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> think the root cause is
> > due to
> > > > > that
> > > > > > > > flink
> > > > > > > > > > > >> >> >>>> didn't
> > > > > > > > > > > >> >> >>>>>>> unify
> > > > > > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> interaction
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> with
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink. Here I list 3
> > scenarios
> > > > of
> > > > > > flink
> > > > > > > > > > > >> >> >>>> operation
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Local job execution.
> > Flink
> > > > > will
> > > > > > > > > create
> > > > > > > > > > > >> >> >>>>>>>>>>> LocalEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> use
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this LocalEnvironment to
> > > > create
> > > > > > > > > > > >> >> >>> LocalExecutor
> > > > > > > > > > > >> >> >>>>> for
> > > > > > > > > > > >> >> >>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> execution.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Remote job execution.
> > Flink
> > > > > will
> > > > > > > > > create
> > > > > > > > > > > >> >> >>>>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> first
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  create
> ContextEnvironment
> > > > based
> > > > > > on the
> > > > > > > > > > > >> >> >>>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> run
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> job.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Job cancelation. Flink
> > will
> > > > > > create
> > > > > > > > > > > >> >> >>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>> first
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> then
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> cancel
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this job via this
> > > > ClusterClient.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> As you can see in the
> > above 3
> > > > > > > > scenarios.
> > > > > > > > > > > >> >> >>> Flink
> > > > > > > > > > > >> >> >>>>>> didn't
> > > > > > > > > > > >> >> >>>>>>>>>>> use the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> same
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> approach(code path) to
> > interact
> > > > > > with
> > > > > > > > > flink
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> What I propose is
> > following:
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Create the proper
> > > > > > > > > > > >> >> >>>>>> LocalEnvironment/RemoteEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>> (based
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> on
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> user
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration) --> Use
> this
> > > > > > Environment
> > > > > > > > > to
> > > > > > > > > > > >> >> >>>> create
> > > > > > > > > > > >> >> >>>>>>>> proper
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (LocalClusterClient or
> > > > > > > > RestClusterClient)
> > > > > > > > > > > >> >> >> to
> > > > > > > > > > > >> >> >>>>>>>> interactive
> > > > > > > > > > > >> >> >>>>>>>>>>> with
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink (
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution or cancelation)
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This way we can unify the
> > > > process
> > > > > > of
> > > > > > > > > local
> > > > > > > > > > > >> >> >>>>>> execution
> > > > > > > > > > > >> >> >>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> remote
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> execution.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> And it is much easier for
> > third
> > > > > > party
> > > > > > > > to
> > > > > > > > > > > >> >> >>>>> integrate
> > > > > > > > > > > >> >> >>>>>>> with
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> flink,
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> because
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment is
> the
> > > > > unified
> > > > > > > > entry
> > > > > > > > > > > >> >> >>> point
> > > > > > > > > > > >> >> >>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>> flink.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> What
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> third
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> party
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> needs to do is just pass
> > > > > > configuration
> > > > > > > > to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will
> > do
> > > > the
> > > > > > right
> > > > > > > > > > > >> >> >> thing
> > > > > > > > > > > >> >> >>>>> based
> > > > > > > > > > > >> >> >>>>>> on
> > > > > > > > > > > >> >> >>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> configuration.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Flink cli can also be
> > > > considered
> > > > > as
> > > > > > > > flink
> > > > > > > > > > > >> >> >> api
> > > > > > > > > > > >> >> >>>>>>> consumer.
> > > > > > > > > > > >> >> >>>>>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> just
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration to
> > > > > > ExecutionEnvironment
> > > > > > > > and
> > > > > > > > > > > >> >> >> let
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ExecutionEnvironment
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> create the proper
> > ClusterClient
> > > > > > instead
> > > > > > > > > of
> > > > > > > > > > > >> >> >>>>> letting
> > > > > > > > > > > >> >> >>>>>>> cli
> > > > > > > > > > > >> >> >>>>>>>> to
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> create
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ClusterClient directly.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6 would involve large
> code
> > > > > > refactoring,
> > > > > > > > > so
> > > > > > > > > > > >> >> >> I
> > > > > > > > > > > >> >> >>>>> think
> > > > > > > > > > > >> >> >>>>>> we
> > > > > > > > > > > >> >> >>>>>>>> can
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> defer
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> it
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> for
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> future release, 1,2,3,4,5
> > could
> > > > > be
> > > > > > done
> > > > > > > > > at
> > > > > > > > > > > >> >> >>>> once I
> > > > > > > > > > > >> >> >>>>>>>>>>> believe.
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Let
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> me
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> know
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> your
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> comments and feedback,
> > thanks
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> --
> > > > > > > > > > > >> >> >>>>>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>>>>
> > > > > > > > > > > >> >> >>>>>>>
> > > > > > > > > > > >> >> >>>>>>
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>> --
> > > > > > > > > > > >> >> >>>>> Best Regards
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>> Jeff Zhang
> > > > > > > > > > > >> >> >>>>>
> > > > > > > > > > > >> >> >>>>
> > > > > > > > > > > >> >> >>>
> > > > > > > > > > > >> >> >>
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > --
> > > > > > > > > > > >> >> > Best Regards
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Jeff Zhang
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to