Great idea Tison!

I will create the umbrella issue and post it here so that we are all
on the same page!

Cheers,
Kostas

On Wed, Sep 4, 2019 at 11:36 AM Zili Chen <wander4...@gmail.com> wrote:
>
> Hi Kostas & Aljoscha,
>
> I notice that there is a JIRA(FLINK-13946) which could be included
> in this refactor thread. Since we agree on most of directions in
> big picture, is it reasonable that we create an umbrella issue for
> refactor client APIs and also linked subtasks? It would be a better
> way that we join forces of our community.
>
> Best,
> tison.
>
>
> Zili Chen <wander4...@gmail.com> 于2019年8月31日周六 下午12:52写道:
>>
>> Great Kostas! Looking forward to your POC!
>>
>> Best,
>> tison.
>>
>>
>> Jeff Zhang <zjf...@gmail.com> 于2019年8月30日周五 下午11:07写道:
>>>
>>> Awesome, @Kostas Looking forward your POC.
>>>
>>> Kostas Kloudas <kklou...@gmail.com> 于2019年8月30日周五 下午8:33写道:
>>>
>>> > Hi all,
>>> >
>>> > I am just writing here to let you know that I am working on a POC that
>>> > tries to refactor the current state of job submission in Flink.
>>> > I want to stress out that it introduces NO CHANGES to the current
>>> > behaviour of Flink. It just re-arranges things and introduces the
>>> > notion of an Executor, which is the entity responsible for taking the
>>> > user-code and submitting it for execution.
>>> >
>>> > Given this, the discussion about the functionality that the JobClient
>>> > will expose to the user can go on independently and the same
>>> > holds for all the open questions so far.
>>> >
>>> > I hope I will have some more new to share soon.
>>> >
>>> > Thanks,
>>> > Kostas
>>> >
>>> > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>> > >
>>> > > Hi Zili,
>>> > >
>>> > > It make sense to me that a dedicated cluster is started for a per-job
>>> > > cluster and will not accept more jobs.
>>> > > Just have a question about the command line.
>>> > >
>>> > > Currently we could use the following commands to start different
>>> > clusters.
>>> > > *per-job cluster*
>>> > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
>>> > > examples/streaming/WindowJoin.jar
>>> > > *session cluster*
>>> > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
>>> > > examples/streaming/WindowJoin.jar
>>> > >
>>> > > What will it look like after client enhancement?
>>> > >
>>> > >
>>> > > Best,
>>> > > Yang
>>> > >
>>> > > Zili Chen <wander4...@gmail.com> 于2019年8月23日周五 下午10:46写道:
>>> > >
>>> > > > Hi Till,
>>> > > >
>>> > > > Thanks for your update. Nice to hear :-)
>>> > > >
>>> > > > Best,
>>> > > > tison.
>>> > > >
>>> > > >
>>> > > > Till Rohrmann <trohrm...@apache.org> 于2019年8月23日周五 下午10:39写道:
>>> > > >
>>> > > > > Hi Tison,
>>> > > > >
>>> > > > > just a quick comment concerning the class loading issues when using
>>> > the
>>> > > > per
>>> > > > > job mode. The community wants to change it so that the
>>> > > > > StandaloneJobClusterEntryPoint actually uses the user code class
>>> > loader
>>> > > > > with child first class loading [1]. Hence, I hope that this problem
>>> > will
>>> > > > be
>>> > > > > resolved soon.
>>> > > > >
>>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
>>> > > > >
>>> > > > > Cheers,
>>> > > > > Till
>>> > > > >
>>> > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas <kklou...@gmail.com>
>>> > > > wrote:
>>> > > > >
>>> > > > > > Hi all,
>>> > > > > >
>>> > > > > > On the topic of web submission, I agree with Till that it only
>>> > seems
>>> > > > > > to complicate things.
>>> > > > > > It is bad for security, job isolation (anybody can submit/cancel
>>> > jobs),
>>> > > > > > and its
>>> > > > > > implementation complicates some parts of the code. So, if it were
>>> > to
>>> > > > > > redesign the
>>> > > > > > WebUI, maybe this part could be left out. In addition, I would say
>>> > > > > > that the ability to cancel
>>> > > > > > jobs could also be left out.
>>> > > > > >
>>> > > > > > Also I would also be in favour of removing the "detached" mode, 
>>> > > > > > for
>>> > > > > > the reasons mentioned
>>> > > > > > above (i.e. because now we will have a future representing the
>>> > result
>>> > > > > > on which the user
>>> > > > > > can choose to wait or not).
>>> > > > > >
>>> > > > > > Now for the separating job submission and cluster creation, I am 
>>> > > > > > in
>>> > > > > > favour of keeping both.
>>> > > > > > Once again, the reasons are mentioned above by Stephan, Till,
>>> > Aljoscha
>>> > > > > > and also Zili seems
>>> > > > > > to agree. They mainly have to do with security, isolation and ease
>>> > of
>>> > > > > > resource management
>>> > > > > > for the user as he knows that "when my job is done, everything
>>> > will be
>>> > > > > > cleared up". This is
>>> > > > > > also the experience you get when launching a process on your local
>>> > OS.
>>> > > > > >
>>> > > > > > On excluding the per-job mode from returning a JobClient or not, I
>>> > > > > > believe that eventually
>>> > > > > > it would be nice to allow users to get back a jobClient. The
>>> > reason is
>>> > > > > > that 1) I cannot
>>> > > > > > find any objective reason why the user-experience should diverge,
>>> > and
>>> > > > > > 2) this will be the
>>> > > > > > way that the user will be able to interact with his running job.
>>> > > > > > Assuming that the necessary
>>> > > > > > ports are open for the REST API to work, then I think that the
>>> > > > > > JobClient can run against the
>>> > > > > > REST API without problems. If the needed ports are not open, then
>>> > we
>>> > > > > > are safe to not return
>>> > > > > > a JobClient, as the user explicitly chose to close all points of
>>> > > > > > communication to his running job.
>>> > > > > >
>>> > > > > > On the topic of not hijacking the "env.execute()" in order to get
>>> > the
>>> > > > > > Plan, I definitely agree but
>>> > > > > > for the proposal of having a "compile()" method in the env, I 
>>> > > > > > would
>>> > > > > > like to have a better look at
>>> > > > > > the existing code.
>>> > > > > >
>>> > > > > > Cheers,
>>> > > > > > Kostas
>>> > > > > >
>>> > > > > > On Fri, Aug 23, 2019 at 5:52 AM Zili Chen <wander4...@gmail.com>
>>> > > > wrote:
>>> > > > > > >
>>> > > > > > > Hi Yang,
>>> > > > > > >
>>> > > > > > > It would be helpful if you check Stephan's last comment,
>>> > > > > > > which states that isolation is important.
>>> > > > > > >
>>> > > > > > > For per-job mode, we run a dedicated cluster(maybe it
>>> > > > > > > should have been a couple of JM and TMs during FLIP-6
>>> > > > > > > design) for a specific job. Thus the process is prevented
>>> > > > > > > from other jobs.
>>> > > > > > >
>>> > > > > > > In our cases there was a time we suffered from multi
>>> > > > > > > jobs submitted by different users and they affected
>>> > > > > > > each other so that all ran into an error state. Also,
>>> > > > > > > run the client inside the cluster could save client
>>> > > > > > > resource at some points.
>>> > > > > > >
>>> > > > > > > However, we also face several issues as you mentioned,
>>> > > > > > > that in per-job mode it always uses parent classloader
>>> > > > > > > thus classloading issues occur.
>>> > > > > > >
>>> > > > > > > BTW, one can makes an analogy between session/per-job mode
>>> > > > > > > in  Flink, and client/cluster mode in Spark.
>>> > > > > > >
>>> > > > > > > Best,
>>> > > > > > > tison.
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > Yang Wang <danrtsey...@gmail.com> 于2019年8月22日周四 上午11:25写道:
>>> > > > > > >
>>> > > > > > > > From the user's perspective, it is really confused about the
>>> > scope
>>> > > > of
>>> > > > > > > > per-job cluster.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > If it means a flink cluster with single job, so that we could
>>> > get
>>> > > > > > better
>>> > > > > > > > isolation.
>>> > > > > > > >
>>> > > > > > > > Now it does not matter how we deploy the cluster, directly
>>> > > > > > deploy(mode1)
>>> > > > > > > >
>>> > > > > > > > or start a flink cluster and then submit job through cluster
>>> > > > > > client(mode2).
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Otherwise, if it just means directly deploy, how should we
>>> > name the
>>> > > > > > mode2,
>>> > > > > > > >
>>> > > > > > > > session with job or something else?
>>> > > > > > > >
>>> > > > > > > > We could also benefit from the mode2. Users could get the same
>>> > > > > > isolation
>>> > > > > > > > with mode1.
>>> > > > > > > >
>>> > > > > > > > The user code and dependencies will be loaded by user class
>>> > loader
>>> > > > > > > >
>>> > > > > > > > to avoid class conflict with framework.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Anyway, both of the two submission modes are useful.
>>> > > > > > > >
>>> > > > > > > > We just need to clarify the concepts.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > Best,
>>> > > > > > > >
>>> > > > > > > > Yang
>>> > > > > > > >
>>> > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午5:58写道:
>>> > > > > > > >
>>> > > > > > > > > Thanks for the clarification.
>>> > > > > > > > >
>>> > > > > > > > > The idea JobDeployer ever came into my mind when I was
>>> > muddled
>>> > > > with
>>> > > > > > > > > how to execute per-job mode and session mode with the same
>>> > user
>>> > > > > code
>>> > > > > > > > > and framework codepath.
>>> > > > > > > > >
>>> > > > > > > > > With the concept JobDeployer we back to the statement that
>>> > > > > > environment
>>> > > > > > > > > knows every configs of cluster deployment and job
>>> > submission. We
>>> > > > > > > > > configure or generate from configuration a specific
>>> > JobDeployer
>>> > > > in
>>> > > > > > > > > environment and then code align on
>>> > > > > > > > >
>>> > > > > > > > > *JobClient client = env.execute().get();*
>>> > > > > > > > >
>>> > > > > > > > > which in session mode returned by clusterClient.submitJob
>>> > and in
>>> > > > > > per-job
>>> > > > > > > > > mode returned by clusterDescriptor.deployJobCluster.
>>> > > > > > > > >
>>> > > > > > > > > Here comes a problem that currently we directly run
>>> > > > > ClusterEntrypoint
>>> > > > > > > > > with extracted job graph. Follow the JobDeployer way we'd
>>> > better
>>> > > > > > > > > align entry point of per-job deployment at JobDeployer.
>>> > Users run
>>> > > > > > > > > their main method or by a Cli(finally call main method) to
>>> > deploy
>>> > > > > the
>>> > > > > > > > > job cluster.
>>> > > > > > > > >
>>> > > > > > > > > Best,
>>> > > > > > > > > tison.
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > > Stephan Ewen <se...@apache.org> 于2019年8月20日周二 下午4:40写道:
>>> > > > > > > > >
>>> > > > > > > > > > Till has made some good comments here.
>>> > > > > > > > > >
>>> > > > > > > > > > Two things to add:
>>> > > > > > > > > >
>>> > > > > > > > > >   - The job mode is very nice in the way that it runs the
>>> > > > client
>>> > > > > > inside
>>> > > > > > > > > the
>>> > > > > > > > > > cluster (in the same image/process that is the JM) and 
>>> > > > > > > > > > thus
>>> > > > > unifies
>>> > > > > > > > both
>>> > > > > > > > > > applications and what the Spark world calls the "driver
>>> > mode".
>>> > > > > > > > > >
>>> > > > > > > > > >   - Another thing I would add is that during the FLIP-6
>>> > design,
>>> > > > > we
>>> > > > > > were
>>> > > > > > > > > > thinking about setups where Dispatcher and JobManager are
>>> > > > > separate
>>> > > > > > > > > > processes.
>>> > > > > > > > > >     A Yarn or Mesos Dispatcher of a session could run
>>> > > > > independently
>>> > > > > > > > (even
>>> > > > > > > > > > as privileged processes executing no code).
>>> > > > > > > > > >     Then you the "per-job" mode could still be helpful:
>>> > when a
>>> > > > > job
>>> > > > > > is
>>> > > > > > > > > > submitted to the dispatcher, it launches the JM again in a
>>> > > > > per-job
>>> > > > > > > > mode,
>>> > > > > > > > > so
>>> > > > > > > > > > that JM and TM processes are bound to teh job only. For
>>> > higher
>>> > > > > > security
>>> > > > > > > > > > setups, it is important that processes are not reused
>>> > across
>>> > > > > jobs.
>>> > > > > > > > > >
>>> > > > > > > > > > On Tue, Aug 20, 2019 at 10:27 AM Till Rohrmann <
>>> > > > > > trohrm...@apache.org>
>>> > > > > > > > > > wrote:
>>> > > > > > > > > >
>>> > > > > > > > > > > I would not be in favour of getting rid of the per-job
>>> > mode
>>> > > > > > since it
>>> > > > > > > > > > > simplifies the process of running Flink jobs
>>> > considerably.
>>> > > > > > Moreover,
>>> > > > > > > > it
>>> > > > > > > > > > is
>>> > > > > > > > > > > not only well suited for container deployments but also
>>> > for
>>> > > > > > > > deployments
>>> > > > > > > > > > > where you want to guarantee job isolation. For example, 
>>> > > > > > > > > > > a
>>> > > > user
>>> > > > > > could
>>> > > > > > > > > use
>>> > > > > > > > > > > the per-job mode on Yarn to execute his job on a 
>>> > > > > > > > > > > separate
>>> > > > > > cluster.
>>> > > > > > > > > > >
>>> > > > > > > > > > > I think that having two notions of cluster deployments
>>> > > > (session
>>> > > > > > vs.
>>> > > > > > > > > > per-job
>>> > > > > > > > > > > mode) does not necessarily contradict your ideas for the
>>> > > > client
>>> > > > > > api
>>> > > > > > > > > > > refactoring. For example one could have the following
>>> > > > > interfaces:
>>> > > > > > > > > > >
>>> > > > > > > > > > > - ClusterDeploymentDescriptor: encapsulates the logic
>>> > how to
>>> > > > > > deploy a
>>> > > > > > > > > > > cluster.
>>> > > > > > > > > > > - ClusterClient: allows to interact with a cluster
>>> > > > > > > > > > > - JobClient: allows to interact with a running job
>>> > > > > > > > > > >
>>> > > > > > > > > > > Now the ClusterDeploymentDescriptor could have two
>>> > methods:
>>> > > > > > > > > > >
>>> > > > > > > > > > > - ClusterClient deploySessionCluster()
>>> > > > > > > > > > > - JobClusterClient/JobClient
>>> > deployPerJobCluster(JobGraph)
>>> > > > > > > > > > >
>>> > > > > > > > > > > where JobClusterClient is either a supertype of
>>> > ClusterClient
>>> > > > > > which
>>> > > > > > > > > does
>>> > > > > > > > > > > not give you the functionality to submit jobs or
>>> > > > > > deployPerJobCluster
>>> > > > > > > > > > > returns directly a JobClient.
>>> > > > > > > > > > >
>>> > > > > > > > > > > When setting up the ExecutionEnvironment, one would then
>>> > not
>>> > > > > > provide
>>> > > > > > > > a
>>> > > > > > > > > > > ClusterClient to submit jobs but a JobDeployer which,
>>> > > > depending
>>> > > > > > on
>>> > > > > > > > the
>>> > > > > > > > > > > selected mode, either uses a ClusterClient (session
>>> > mode) to
>>> > > > > > submit
>>> > > > > > > > > jobs
>>> > > > > > > > > > or
>>> > > > > > > > > > > a ClusterDeploymentDescriptor to deploy per a job mode
>>> > > > cluster
>>> > > > > > with
>>> > > > > > > > the
>>> > > > > > > > > > job
>>> > > > > > > > > > > to execute.
>>> > > > > > > > > > >
>>> > > > > > > > > > > These are just some thoughts how one could make it
>>> > working
>>> > > > > > because I
>>> > > > > > > > > > > believe there is some value in using the per job mode
>>> > from
>>> > > > the
>>> > > > > > > > > > > ExecutionEnvironment.
>>> > > > > > > > > > >
>>> > > > > > > > > > > Concerning the web submission, this is indeed a bit
>>> > tricky.
>>> > > > > From
>>> > > > > > a
>>> > > > > > > > > > cluster
>>> > > > > > > > > > > management stand point, I would in favour of not
>>> > executing
>>> > > > user
>>> > > > > > code
>>> > > > > > > > on
>>> > > > > > > > > > the
>>> > > > > > > > > > > REST endpoint. Especially when considering security, it
>>> > would
>>> > > > > be
>>> > > > > > good
>>> > > > > > > > > to
>>> > > > > > > > > > > have a well defined cluster behaviour where it is
>>> > explicitly
>>> > > > > > stated
>>> > > > > > > > > where
>>> > > > > > > > > > > user code and, thus, potentially risky code is executed.
>>> > > > > Ideally
>>> > > > > > we
>>> > > > > > > > > limit
>>> > > > > > > > > > > it to the TaskExecutor and JobMaster.
>>> > > > > > > > > > >
>>> > > > > > > > > > > Cheers,
>>> > > > > > > > > > > Till
>>> > > > > > > > > > >
>>> > > > > > > > > > > On Tue, Aug 20, 2019 at 9:40 AM Flavio Pompermaier <
>>> > > > > > > > > pomperma...@okkam.it
>>> > > > > > > > > > >
>>> > > > > > > > > > > wrote:
>>> > > > > > > > > > >
>>> > > > > > > > > > > > In my opinion the client should not use any
>>> > environment to
>>> > > > > get
>>> > > > > > the
>>> > > > > > > > > Job
>>> > > > > > > > > > > > graph because the jar should reside ONLY on the 
>>> > > > > > > > > > > > cluster
>>> > > > (and
>>> > > > > > not in
>>> > > > > > > > > the
>>> > > > > > > > > > > > client classpath otherwise there are always
>>> > inconsistencies
>>> > > > > > between
>>> > > > > > > > > > > client
>>> > > > > > > > > > > > and Flink Job manager's classpath).
>>> > > > > > > > > > > > In the YARN, Mesos and Kubernetes scenarios you have
>>> > the
>>> > > > jar
>>> > > > > > but
>>> > > > > > > > you
>>> > > > > > > > > > > could
>>> > > > > > > > > > > > start a cluster that has the jar on the Job Manager as
>>> > well
>>> > > > > > (but
>>> > > > > > > > this
>>> > > > > > > > > > is
>>> > > > > > > > > > > > the only case where I think you can assume that the
>>> > client
>>> > > > > has
>>> > > > > > the
>>> > > > > > > > > jar
>>> > > > > > > > > > on
>>> > > > > > > > > > > > the classpath..in the REST job submission you don't
>>> > have
>>> > > > any
>>> > > > > > > > > > classpath).
>>> > > > > > > > > > > >
>>> > > > > > > > > > > > Thus, always in my opinion, the JobGraph should be
>>> > > > generated
>>> > > > > > by the
>>> > > > > > > > > Job
>>> > > > > > > > > > > > Manager REST API.
>>> > > > > > > > > > > >
>>> > > > > > > > > > > >
>>> > > > > > > > > > > > On Tue, Aug 20, 2019 at 9:00 AM Zili Chen <
>>> > > > > > wander4...@gmail.com>
>>> > > > > > > > > > wrote:
>>> > > > > > > > > > > >
>>> > > > > > > > > > > >> I would like to involve Till & Stephan here to 
>>> > > > > > > > > > > >> clarify
>>> > > > some
>>> > > > > > > > concept
>>> > > > > > > > > of
>>> > > > > > > > > > > >> per-job mode.
>>> > > > > > > > > > > >>
>>> > > > > > > > > > > >> The term per-job is one of modes a cluster could run
>>> > on.
>>> > > > It
>>> > > > > is
>>> > > > > > > > > mainly
>>> > > > > > > > > > > >> aimed
>>> > > > > > > > > > > >> at spawn
>>> > > > > > > > > > > >> a dedicated cluster for a specific job while the job
>>> > could
>>> > > > > be
>>> > > > > > > > > packaged
>>> > > > > > > > > > > >> with
>>> > > > > > > > > > > >> Flink
>>> > > > > > > > > > > >> itself and thus the cluster initialized with job so
>>> > that
>>> > > > get
>>> > > > > > rid
>>> > > > > > > > of
>>> > > > > > > > > a
>>> > > > > > > > > > > >> separated
>>> > > > > > > > > > > >> submission step.
>>> > > > > > > > > > > >>
>>> > > > > > > > > > > >> This is useful for container deployments where one
>>> > create
>>> > > > > his
>>> > > > > > > > image
>>> > > > > > > > > > with
>>> > > > > > > > > > > >> the job
>>> > > > > > > > > > > >> and then simply deploy the container.
>>> > > > > > > > > > > >>
>>> > > > > > > > > > > >> However, it is out of client scope since a
>>> > > > > > client(ClusterClient
>>> > > > > > > > for
>>> > > > > > > > > > > >> example) is for
>>> > > > > > > > > > > >> communicate with an existing cluster and performance
>>> > > > > actions.
>>> > > > > > > > > > Currently,
>>> > > > > > > > > > > >> in
>>> > > > > > > > > > > >> per-job
>>> > > > > > > > > > > >> mode, we extract the job graph and bundle it into
>>> > cluster
>>> > > > > > > > deployment
>>> > > > > > > > > > and
>>> > > > > > > > > > > >> thus no
>>> > > > > > > > > > > >> concept of client get involved. It looks like
>>> > reasonable
>>> > > > to
>>> > > > > > > > exclude
>>> > > > > > > > > > the
>>> > > > > > > > > > > >> deployment
>>> > > > > > > > > > > >> of per-job cluster from client api and use dedicated
>>> > > > utility
>>> > > > > > > > > > > >> classes(deployers) for
>>> > > > > > > > > > > >> deployment.
>>> > > > > > > > > > > >>
>>> > > > > > > > > > > >> Zili Chen <wander4...@gmail.com> 于2019年8月20日周二
>>> > 下午12:37写道:
>>> > > > > > > > > > > >>
>>> > > > > > > > > > > >> > Hi Aljoscha,
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > Thanks for your reply and participance. The Google
>>> > Doc
>>> > > > you
>>> > > > > > > > linked
>>> > > > > > > > > to
>>> > > > > > > > > > > >> > requires
>>> > > > > > > > > > > >> > permission and I think you could use a share link
>>> > > > instead.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > I agree with that we almost reach a consensus that
>>> > > > > > JobClient is
>>> > > > > > > > > > > >> necessary
>>> > > > > > > > > > > >> > to
>>> > > > > > > > > > > >> > interacte with a running Job.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > Let me check your open questions one by one.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > 1. Separate cluster creation and job submission for
>>> > > > > per-job
>>> > > > > > > > mode.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > As you mentioned here is where the opinions
>>> > diverge. In
>>> > > > my
>>> > > > > > > > > document
>>> > > > > > > > > > > >> there
>>> > > > > > > > > > > >> > is
>>> > > > > > > > > > > >> > an alternative[2] that proposes excluding per-job
>>> > > > > deployment
>>> > > > > > > > from
>>> > > > > > > > > > > client
>>> > > > > > > > > > > >> > api
>>> > > > > > > > > > > >> > scope and now I find it is more reasonable we do 
>>> > > > > > > > > > > >> > the
>>> > > > > > exclusion.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > When in per-job mode, a dedicated JobCluster is
>>> > launched
>>> > > > > to
>>> > > > > > > > > execute
>>> > > > > > > > > > > the
>>> > > > > > > > > > > >> > specific job. It is like a Flink Application more
>>> > than a
>>> > > > > > > > > submission
>>> > > > > > > > > > > >> > of Flink Job. Client only takes care of job
>>> > submission
>>> > > > and
>>> > > > > > > > assume
>>> > > > > > > > > > > there
>>> > > > > > > > > > > >> is
>>> > > > > > > > > > > >> > an existing cluster. In this way we are able to
>>> > consider
>>> > > > > > per-job
>>> > > > > > > > > > > issues
>>> > > > > > > > > > > >> > individually and JobClusterEntrypoint would be the
>>> > > > utility
>>> > > > > > class
>>> > > > > > > > > for
>>> > > > > > > > > > > >> > per-job
>>> > > > > > > > > > > >> > deployment.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > Nevertheless, user program works in both session
>>> > mode
>>> > > > and
>>> > > > > > > > per-job
>>> > > > > > > > > > mode
>>> > > > > > > > > > > >> > without
>>> > > > > > > > > > > >> > necessary to change code. JobClient in per-job mode
>>> > is
>>> > > > > > returned
>>> > > > > > > > > from
>>> > > > > > > > > > > >> > env.execute as normal. However, it would be no
>>> > longer a
>>> > > > > > wrapper
>>> > > > > > > > of
>>> > > > > > > > > > > >> > RestClusterClient but a wrapper of
>>> > PerJobClusterClient
>>> > > > > which
>>> > > > > > > > > > > >> communicates
>>> > > > > > > > > > > >> > to Dispatcher locally.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > 2. How to deal with plan preview.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > With env.compile functions users can get JobGraph 
>>> > > > > > > > > > > >> > or
>>> > > > > > FlinkPlan
>>> > > > > > > > and
>>> > > > > > > > > > > thus
>>> > > > > > > > > > > >> > they can preview the plan with programming.
>>> > Typically it
>>> > > > > > looks
>>> > > > > > > > > like
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > if (preview configured) {
>>> > > > > > > > > > > >> >     FlinkPlan plan = env.compile();
>>> > > > > > > > > > > >> >     new JSONDumpGenerator(...).dump(plan);
>>> > > > > > > > > > > >> > } else {
>>> > > > > > > > > > > >> >     env.execute();
>>> > > > > > > > > > > >> > }
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > And `flink info` would be invalid any more.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > 3. How to deal with Jar Submission at the Web
>>> > Frontend.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > There is one more thread talked on this topic[1].
>>> > Apart
>>> > > > > from
>>> > > > > > > > > > removing
>>> > > > > > > > > > > >> > the functions there are two alternatives.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > One is to introduce an interface has a method
>>> > returns
>>> > > > > > > > > > > JobGraph/FilnkPlan
>>> > > > > > > > > > > >> > and Jar Submission only support main-class
>>> > implements
>>> > > > this
>>> > > > > > > > > > interface.
>>> > > > > > > > > > > >> > And then extract the JobGraph/FlinkPlan just by
>>> > calling
>>> > > > > the
>>> > > > > > > > > method.
>>> > > > > > > > > > > >> > In this way, it is even possible to consider a
>>> > > > separation
>>> > > > > > of job
>>> > > > > > > > > > > >> creation
>>> > > > > > > > > > > >> > and job submission.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > The other is, as you mentioned, let execute() do 
>>> > > > > > > > > > > >> > the
>>> > > > > actual
>>> > > > > > > > > > execution.
>>> > > > > > > > > > > >> > We won't execute the main method in the WebFrontend
>>> > but
>>> > > > > > spawn a
>>> > > > > > > > > > > process
>>> > > > > > > > > > > >> > at WebMonitor side to execute. For return part we
>>> > could
>>> > > > > > generate
>>> > > > > > > > > the
>>> > > > > > > > > > > >> > JobID from WebMonitor and pass it to the execution
>>> > > > > > environemnt.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > 4. How to deal with detached mode.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > I think detached mode is a temporary solution for
>>> > > > > > non-blocking
>>> > > > > > > > > > > >> submission.
>>> > > > > > > > > > > >> > In my document both submission and execution return
>>> > a
>>> > > > > > > > > > > CompletableFuture
>>> > > > > > > > > > > >> and
>>> > > > > > > > > > > >> > users control whether or not wait for the result. 
>>> > > > > > > > > > > >> > In
>>> > > > this
>>> > > > > > point
>>> > > > > > > > we
>>> > > > > > > > > > > don't
>>> > > > > > > > > > > >> > need a detached option but the functionality is
>>> > covered.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > 5. How does per-job mode interact with interactive
>>> > > > > > programming.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > All of YARN, Mesos and Kubernetes scenarios follow
>>> > the
>>> > > > > > pattern
>>> > > > > > > > > > launch
>>> > > > > > > > > > > a
>>> > > > > > > > > > > >> > JobCluster now. And I don't think there would be
>>> > > > > > inconsistency
>>> > > > > > > > > > between
>>> > > > > > > > > > > >> > different resource management.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > Best,
>>> > > > > > > > > > > >> > tison.
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > [1]
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://lists.apache.org/x/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
>>> > > > > > > > > > > >> > [2]
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?disco=AAAADZaGGfs
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> > Aljoscha Krettek <aljos...@apache.org>
>>> > 于2019年8月16日周五
>>> > > > > > 下午9:20写道:
>>> > > > > > > > > > > >> >
>>> > > > > > > > > > > >> >> Hi,
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> I read both Jeffs initial design document and the
>>> > newer
>>> > > > > > > > document
>>> > > > > > > > > by
>>> > > > > > > > > > > >> >> Tison. I also finally found the time to collect 
>>> > > > > > > > > > > >> >> our
>>> > > > > > thoughts on
>>> > > > > > > > > the
>>> > > > > > > > > > > >> issue,
>>> > > > > > > > > > > >> >> I had quite some discussions with Kostas and this
>>> > is
>>> > > > the
>>> > > > > > > > result:
>>> > > > > > > > > > [1].
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> I think overall we agree that this part of the
>>> > code is
>>> > > > in
>>> > > > > > dire
>>> > > > > > > > > need
>>> > > > > > > > > > > of
>>> > > > > > > > > > > >> >> some refactoring/improvements but I think there 
>>> > > > > > > > > > > >> >> are
>>> > > > still
>>> > > > > > some
>>> > > > > > > > > open
>>> > > > > > > > > > > >> >> questions and some differences in opinion what
>>> > those
>>> > > > > > > > refactorings
>>> > > > > > > > > > > >> should
>>> > > > > > > > > > > >> >> look like.
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> I think the API-side is quite clear, i.e. we need
>>> > some
>>> > > > > > > > JobClient
>>> > > > > > > > > > API
>>> > > > > > > > > > > >> that
>>> > > > > > > > > > > >> >> allows interacting with a running Job. It could be
>>> > > > > > worthwhile
>>> > > > > > > > to
>>> > > > > > > > > > spin
>>> > > > > > > > > > > >> that
>>> > > > > > > > > > > >> >> off into a separate FLIP because we can probably
>>> > find
>>> > > > > > consensus
>>> > > > > > > > > on
>>> > > > > > > > > > > that
>>> > > > > > > > > > > >> >> part more easily.
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> For the rest, the main open questions from our doc
>>> > are
>>> > > > > > these:
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >>   - Do we want to separate cluster creation and 
>>> > > > > > > > > > > >> >> job
>>> > > > > > submission
>>> > > > > > > > > for
>>> > > > > > > > > > > >> >> per-job mode? In the past, there were conscious
>>> > efforts
>>> > > > > to
>>> > > > > > > > *not*
>>> > > > > > > > > > > >> separate
>>> > > > > > > > > > > >> >> job submission from cluster creation for per-job
>>> > > > clusters
>>> > > > > > for
>>> > > > > > > > > > Mesos,
>>> > > > > > > > > > > >> YARN,
>>> > > > > > > > > > > >> >> Kubernets (see StandaloneJobClusterEntryPoint).
>>> > Tison
>>> > > > > > suggests
>>> > > > > > > > in
>>> > > > > > > > > > his
>>> > > > > > > > > > > >> >> design document to decouple this in order to unify
>>> > job
>>> > > > > > > > > submission.
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >>   - How to deal with plan preview, which needs to
>>> > > > hijack
>>> > > > > > > > > execute()
>>> > > > > > > > > > > and
>>> > > > > > > > > > > >> >> let the outside code catch an exception?
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >>   - How to deal with Jar Submission at the Web
>>> > > > Frontend,
>>> > > > > > which
>>> > > > > > > > > > needs
>>> > > > > > > > > > > to
>>> > > > > > > > > > > >> >> hijack execute() and let the outside code catch an
>>> > > > > > exception?
>>> > > > > > > > > > > >> >> CliFrontend.run() “hijacks”
>>> > > > > ExecutionEnvironment.execute()
>>> > > > > > to
>>> > > > > > > > > get a
>>> > > > > > > > > > > >> >> JobGraph and then execute that JobGraph manually.
>>> > We
>>> > > > > could
>>> > > > > > get
>>> > > > > > > > > > around
>>> > > > > > > > > > > >> that
>>> > > > > > > > > > > >> >> by letting execute() do the actual execution. One
>>> > > > caveat
>>> > > > > > for
>>> > > > > > > > this
>>> > > > > > > > > > is
>>> > > > > > > > > > > >> that
>>> > > > > > > > > > > >> >> now the main() method doesn’t return (or is forced
>>> > to
>>> > > > > > return by
>>> > > > > > > > > > > >> throwing an
>>> > > > > > > > > > > >> >> exception from execute()) which means that for Jar
>>> > > > > > Submission
>>> > > > > > > > > from
>>> > > > > > > > > > > the
>>> > > > > > > > > > > >> >> WebFrontend we have a long-running main() method
>>> > > > running
>>> > > > > > in the
>>> > > > > > > > > > > >> >> WebFrontend. This doesn’t sound very good. We
>>> > could get
>>> > > > > > around
>>> > > > > > > > > this
>>> > > > > > > > > > > by
>>> > > > > > > > > > > >> >> removing the plan preview feature and by removing
>>> > Jar
>>> > > > > > > > > > > >> Submission/Running.
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >>   - How to deal with detached mode? Right now,
>>> > > > > > > > > DetachedEnvironment
>>> > > > > > > > > > > will
>>> > > > > > > > > > > >> >> execute the job and return immediately. If users
>>> > > > control
>>> > > > > > when
>>> > > > > > > > > they
>>> > > > > > > > > > > >> want to
>>> > > > > > > > > > > >> >> return, by waiting on the job completion future,
>>> > how do
>>> > > > > we
>>> > > > > > deal
>>> > > > > > > > > > with
>>> > > > > > > > > > > >> this?
>>> > > > > > > > > > > >> >> Do we simply remove the distinction between
>>> > > > > > > > > detached/non-detached?
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >>   - How does per-job mode interact with
>>> > “interactive
>>> > > > > > > > programming”
>>> > > > > > > > > > > >> >> (FLIP-36). For YARN, each execute() call could
>>> > spawn a
>>> > > > > new
>>> > > > > > > > Flink
>>> > > > > > > > > > YARN
>>> > > > > > > > > > > >> >> cluster. What about Mesos and Kubernetes?
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> The first open question is where the opinions
>>> > diverge,
>>> > > > I
>>> > > > > > think.
>>> > > > > > > > > The
>>> > > > > > > > > > > >> rest
>>> > > > > > > > > > > >> >> are just open questions and interesting things
>>> > that we
>>> > > > > > need to
>>> > > > > > > > > > > >> consider.
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> Best,
>>> > > > > > > > > > > >> >> Aljoscha
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> [1]
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
>>> > > > > > > > > > > >> >> <
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >> > On 31. Jul 2019, at 15:23, Jeff Zhang <
>>> > > > > zjf...@gmail.com>
>>> > > > > > > > > wrote:
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> > Thanks tison for the effort. I left a few
>>> > comments.
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> > Zili Chen <wander4...@gmail.com> 于2019年7月31日周三
>>> > > > > 下午8:24写道:
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> >> Hi Flavio,
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >> Thanks for your reply.
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >> Either current impl and in the design,
>>> > ClusterClient
>>> > > > > > > > > > > >> >> >> never takes responsibility for generating
>>> > JobGraph.
>>> > > > > > > > > > > >> >> >> (what you see in current codebase is several
>>> > class
>>> > > > > > methods)
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >> Instead, user describes his program in the main
>>> > > > method
>>> > > > > > > > > > > >> >> >> with ExecutionEnvironment apis and calls
>>> > > > env.compile()
>>> > > > > > > > > > > >> >> >> or env.optimize() to get FlinkPlan and JobGraph
>>> > > > > > > > respectively.
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >> For listing main classes in a jar and choose
>>> > one for
>>> > > > > > > > > > > >> >> >> submission, you're now able to customize a CLI
>>> > to do
>>> > > > > it.
>>> > > > > > > > > > > >> >> >> Specifically, the path of jar is passed as
>>> > arguments
>>> > > > > and
>>> > > > > > > > > > > >> >> >> in the customized CLI you list main classes,
>>> > choose
>>> > > > > one
>>> > > > > > > > > > > >> >> >> to submit to the cluster.
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >> Best,
>>> > > > > > > > > > > >> >> >> tison.
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >> Flavio Pompermaier <pomperma...@okkam.it>
>>> > > > > 于2019年7月31日周三
>>> > > > > > > > > > 下午8:12写道:
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >>> Just one note on my side: it is not clear to 
>>> > > > > > > > > > > >> >> >>> me
>>> > > > > > whether the
>>> > > > > > > > > > > client
>>> > > > > > > > > > > >> >> needs
>>> > > > > > > > > > > >> >> >> to
>>> > > > > > > > > > > >> >> >>> be able to generate a job graph or not.
>>> > > > > > > > > > > >> >> >>> In my opinion, the job jar must resides only
>>> > on the
>>> > > > > > > > > > > >> server/jobManager
>>> > > > > > > > > > > >> >> >> side
>>> > > > > > > > > > > >> >> >>> and the client requires a way to get the job
>>> > graph.
>>> > > > > > > > > > > >> >> >>> If you really want to access to the job graph,
>>> > I'd
>>> > > > > add
>>> > > > > > a
>>> > > > > > > > > > > dedicated
>>> > > > > > > > > > > >> >> method
>>> > > > > > > > > > > >> >> >>> on the ClusterClient. like:
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>>   - getJobGraph(jarId, mainClass): JobGraph
>>> > > > > > > > > > > >> >> >>>   - listMainClasses(jarId): List<String>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>> These would require some addition also on the
>>> > job
>>> > > > > > manager
>>> > > > > > > > > > > endpoint
>>> > > > > > > > > > > >> as
>>> > > > > > > > > > > >> >> >>> well..what do you think?
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>> On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <
>>> > > > > > > > > > wander4...@gmail.com
>>> > > > > > > > > > > >
>>> > > > > > > > > > > >> >> wrote:
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>>> Hi all,
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> Here is a document[1] on client api
>>> > enhancement
>>> > > > from
>>> > > > > > our
>>> > > > > > > > > > > >> perspective.
>>> > > > > > > > > > > >> >> >>>> We have investigated current implementations.
>>> > And
>>> > > > we
>>> > > > > > > > propose
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> 1. Unify the implementation of cluster
>>> > deployment
>>> > > > > and
>>> > > > > > job
>>> > > > > > > > > > > >> submission
>>> > > > > > > > > > > >> >> in
>>> > > > > > > > > > > >> >> >>>> Flink.
>>> > > > > > > > > > > >> >> >>>> 2. Provide programmatic interfaces to allow
>>> > > > flexible
>>> > > > > > job
>>> > > > > > > > and
>>> > > > > > > > > > > >> cluster
>>> > > > > > > > > > > >> >> >>>> management.
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> The first proposal is aimed at reducing code
>>> > paths
>>> > > > > of
>>> > > > > > > > > cluster
>>> > > > > > > > > > > >> >> >> deployment
>>> > > > > > > > > > > >> >> >>>> and
>>> > > > > > > > > > > >> >> >>>> job submission so that one can adopt Flink in
>>> > his
>>> > > > > > usage
>>> > > > > > > > > > easily.
>>> > > > > > > > > > > >> The
>>> > > > > > > > > > > >> >> >>> second
>>> > > > > > > > > > > >> >> >>>> proposal is aimed at providing rich
>>> > interfaces for
>>> > > > > > > > advanced
>>> > > > > > > > > > > users
>>> > > > > > > > > > > >> >> >>>> who want to make accurate control of these
>>> > stages.
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> Quick reference on open questions:
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> 1. Exclude job cluster deployment from client
>>> > side
>>> > > > > or
>>> > > > > > > > > redefine
>>> > > > > > > > > > > the
>>> > > > > > > > > > > >> >> >>> semantic
>>> > > > > > > > > > > >> >> >>>> of job cluster? Since it fits in a process
>>> > quite
>>> > > > > > different
>>> > > > > > > > > > from
>>> > > > > > > > > > > >> >> session
>>> > > > > > > > > > > >> >> >>>> cluster deployment and job submission.
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> 2. Maintain the codepaths handling class
>>> > > > > > > > > > > o.a.f.api.common.Program
>>> > > > > > > > > > > >> or
>>> > > > > > > > > > > >> >> >>>> implement customized program handling logic 
>>> > > > > > > > > > > >> >> >>>> by
>>> > > > > > customized
>>> > > > > > > > > > > >> >> CliFrontend?
>>> > > > > > > > > > > >> >> >>>> See also this thread[2] and the document[1].
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> 3. Expose ClusterClient as public api or just
>>> > > > expose
>>> > > > > > api
>>> > > > > > > > in
>>> > > > > > > > > > > >> >> >>>> ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>> and delegate them to ClusterClient? Further,
>>> > in
>>> > > > > > either way
>>> > > > > > > > > is
>>> > > > > > > > > > it
>>> > > > > > > > > > > >> >> worth
>>> > > > > > > > > > > >> >> >> to
>>> > > > > > > > > > > >> >> >>>> introduce a JobClient which is an
>>> > encapsulation of
>>> > > > > > > > > > ClusterClient
>>> > > > > > > > > > > >> that
>>> > > > > > > > > > > >> >> >>>> associated to specific job?
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> Best,
>>> > > > > > > > > > > >> >> >>>> tison.
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> [1]
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
>>> > > > > > > > > > > >> >> >>>> [2]
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三
>>> > > > > 上午9:19写道:
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>>> Thanks Stephan, I will follow up this issue
>>> > in
>>> > > > next
>>> > > > > > few
>>> > > > > > > > > > weeks,
>>> > > > > > > > > > > >> and
>>> > > > > > > > > > > >> >> >> will
>>> > > > > > > > > > > >> >> >>>>> refine the design doc. We could discuss more
>>> > > > > details
>>> > > > > > > > after
>>> > > > > > > > > > 1.9
>>> > > > > > > > > > > >> >> >> release.
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>> Stephan Ewen <se...@apache.org>
>>> > 于2019年7月24日周三
>>> > > > > > 上午12:58写道:
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>>> Hi all!
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>> This thread has stalled for a bit, which I
>>> > > > assume
>>> > > > > > ist
>>> > > > > > > > > mostly
>>> > > > > > > > > > > >> due to
>>> > > > > > > > > > > >> >> >>> the
>>> > > > > > > > > > > >> >> >>>>>> Flink 1.9 feature freeze and release 
>>> > > > > > > > > > > >> >> >>>>>> testing
>>> > > > > effort.
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>> I personally still recognize this issue as
>>> > one
>>> > > > > > important
>>> > > > > > > > > to
>>> > > > > > > > > > be
>>> > > > > > > > > > > >> >> >>> solved.
>>> > > > > > > > > > > >> >> >>>>> I'd
>>> > > > > > > > > > > >> >> >>>>>> be happy to help resume this discussion 
>>> > > > > > > > > > > >> >> >>>>>> soon
>>> > > > > (after
>>> > > > > > the
>>> > > > > > > > > 1.9
>>> > > > > > > > > > > >> >> >> release)
>>> > > > > > > > > > > >> >> >>>> and
>>> > > > > > > > > > > >> >> >>>>>> see if we can do some step towards this in
>>> > Flink
>>> > > > > > 1.10.
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>> Best,
>>> > > > > > > > > > > >> >> >>>>>> Stephan
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>> On Mon, Jun 24, 2019 at 10:41 AM Flavio
>>> > > > > Pompermaier
>>> > > > > > <
>>> > > > > > > > > > > >> >> >>>>> pomperma...@okkam.it>
>>> > > > > > > > > > > >> >> >>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>>> That's exactly what I suggested a long 
>>> > > > > > > > > > > >> >> >>>>>>> time
>>> > > > ago:
>>> > > > > > the
>>> > > > > > > > > Flink
>>> > > > > > > > > > > REST
>>> > > > > > > > > > > >> >> >>>> client
>>> > > > > > > > > > > >> >> >>>>>>> should not require any Flink dependency,
>>> > only
>>> > > > > http
>>> > > > > > > > > library
>>> > > > > > > > > > to
>>> > > > > > > > > > > >> >> >> call
>>> > > > > > > > > > > >> >> >>>> the
>>> > > > > > > > > > > >> >> >>>>>> REST
>>> > > > > > > > > > > >> >> >>>>>>> services to submit and monitor a job.
>>> > > > > > > > > > > >> >> >>>>>>> What I suggested also in [1] was to have a
>>> > way
>>> > > > to
>>> > > > > > > > > > > automatically
>>> > > > > > > > > > > >> >> >>>> suggest
>>> > > > > > > > > > > >> >> >>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>> user (via a UI) the available main classes
>>> > and
>>> > > > > > their
>>> > > > > > > > > > required
>>> > > > > > > > > > > >> >> >>>>>>> parameters[2].
>>> > > > > > > > > > > >> >> >>>>>>> Another problem we have with Flink is that
>>> > the
>>> > > > > Rest
>>> > > > > > > > > client
>>> > > > > > > > > > > and
>>> > > > > > > > > > > >> >> >> the
>>> > > > > > > > > > > >> >> >>>> CLI
>>> > > > > > > > > > > >> >> >>>>>> one
>>> > > > > > > > > > > >> >> >>>>>>> behaves differently and we use the CLI
>>> > client
>>> > > > > (via
>>> > > > > > ssh)
>>> > > > > > > > > > > because
>>> > > > > > > > > > > >> >> >> it
>>> > > > > > > > > > > >> >> >>>>> allows
>>> > > > > > > > > > > >> >> >>>>>>> to call some other method after
>>> > env.execute()
>>> > > > [3]
>>> > > > > > (we
>>> > > > > > > > > have
>>> > > > > > > > > > to
>>> > > > > > > > > > > >> >> >> call
>>> > > > > > > > > > > >> >> >>>>>> another
>>> > > > > > > > > > > >> >> >>>>>>> REST service to signal the end of the 
>>> > > > > > > > > > > >> >> >>>>>>> job).
>>> > > > > > > > > > > >> >> >>>>>>> Int his regard, a dedicated interface,
>>> > like the
>>> > > > > > > > > JobListener
>>> > > > > > > > > > > >> >> >>> suggested
>>> > > > > > > > > > > >> >> >>>>> in
>>> > > > > > > > > > > >> >> >>>>>>> the previous emails, would be very helpful
>>> > > > > (IMHO).
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>> [1]
>>> > > > > > https://issues.apache.org/jira/browse/FLINK-10864
>>> > > > > > > > > > > >> >> >>>>>>> [2]
>>> > > > > > https://issues.apache.org/jira/browse/FLINK-10862
>>> > > > > > > > > > > >> >> >>>>>>> [3]
>>> > > > > > https://issues.apache.org/jira/browse/FLINK-10879
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>> Best,
>>> > > > > > > > > > > >> >> >>>>>>> Flavio
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>> On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang
>>> > <
>>> > > > > > > > > > zjf...@gmail.com
>>> > > > > > > > > > > >
>>> > > > > > > > > > > >> >> >>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> Hi, Tison,
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> Thanks for your comments. Overall I agree
>>> > with
>>> > > > > you
>>> > > > > > > > that
>>> > > > > > > > > it
>>> > > > > > > > > > > is
>>> > > > > > > > > > > >> >> >>>>> difficult
>>> > > > > > > > > > > >> >> >>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>> down stream project to integrate with
>>> > flink
>>> > > > and
>>> > > > > we
>>> > > > > > > > need
>>> > > > > > > > > to
>>> > > > > > > > > > > >> >> >>> refactor
>>> > > > > > > > > > > >> >> >>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>> current flink client api.
>>> > > > > > > > > > > >> >> >>>>>>>> And I agree that CliFrontend should only
>>> > > > parsing
>>> > > > > > > > command
>>> > > > > > > > > > > line
>>> > > > > > > > > > > >> >> >>>>> arguments
>>> > > > > > > > > > > >> >> >>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>> then pass them to ExecutionEnvironment.
>>> > It is
>>> > > > > > > > > > > >> >> >>>> ExecutionEnvironment's
>>> > > > > > > > > > > >> >> >>>>>>>> responsibility to compile job, create
>>> > cluster,
>>> > > > > and
>>> > > > > > > > > submit
>>> > > > > > > > > > > job.
>>> > > > > > > > > > > >> >> >>>>> Besides
>>> > > > > > > > > > > >> >> >>>>>>>> that, Currently flink has many
>>> > > > > > ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>> implementations,
>>> > > > > > > > > > > >> >> >>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>> flink will use the specific one based on
>>> > the
>>> > > > > > context.
>>> > > > > > > > > > IMHO,
>>> > > > > > > > > > > it
>>> > > > > > > > > > > >> >> >> is
>>> > > > > > > > > > > >> >> >>>> not
>>> > > > > > > > > > > >> >> >>>>>>>> necessary, ExecutionEnvironment should be
>>> > able
>>> > > > > to
>>> > > > > > do
>>> > > > > > > > the
>>> > > > > > > > > > > right
>>> > > > > > > > > > > >> >> >>>> thing
>>> > > > > > > > > > > >> >> >>>>>>> based
>>> > > > > > > > > > > >> >> >>>>>>>> on the FlinkConf it is received. Too many
>>> > > > > > > > > > > ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>> implementation is another burden for
>>> > > > downstream
>>> > > > > > > > project
>>> > > > > > > > > > > >> >> >>>> integration.
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> One thing I'd like to mention is flink's
>>> > scala
>>> > > > > > shell
>>> > > > > > > > and
>>> > > > > > > > > > sql
>>> > > > > > > > > > > >> >> >>>> client,
>>> > > > > > > > > > > >> >> >>>>>>>> although they are sub-modules of flink,
>>> > they
>>> > > > > > could be
>>> > > > > > > > > > > treated
>>> > > > > > > > > > > >> >> >> as
>>> > > > > > > > > > > >> >> >>>>>>> downstream
>>> > > > > > > > > > > >> >> >>>>>>>> project which use flink's client api.
>>> > > > Currently
>>> > > > > > you
>>> > > > > > > > will
>>> > > > > > > > > > > find
>>> > > > > > > > > > > >> >> >> it
>>> > > > > > > > > > > >> >> >>> is
>>> > > > > > > > > > > >> >> >>>>> not
>>> > > > > > > > > > > >> >> >>>>>>>> easy for them to integrate with flink,
>>> > they
>>> > > > > share
>>> > > > > > many
>>> > > > > > > > > > > >> >> >> duplicated
>>> > > > > > > > > > > >> >> >>>>> code.
>>> > > > > > > > > > > >> >> >>>>>>> It
>>> > > > > > > > > > > >> >> >>>>>>>> is another sign that we should refactor
>>> > flink
>>> > > > > > client
>>> > > > > > > > > api.
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> I believe it is a large and hard change,
>>> > and I
>>> > > > > am
>>> > > > > > > > afraid
>>> > > > > > > > > > we
>>> > > > > > > > > > > >> can
>>> > > > > > > > > > > >> >> >>> not
>>> > > > > > > > > > > >> >> >>>>>> keep
>>> > > > > > > > > > > >> >> >>>>>>>> compatibility since many of changes are
>>> > user
>>> > > > > > facing.
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> Zili Chen <wander4...@gmail.com>
>>> > > > 于2019年6月24日周一
>>> > > > > > > > > 下午2:53写道:
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> Hi all,
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> After a closer look on our client apis,
>>> > I can
>>> > > > > see
>>> > > > > > > > there
>>> > > > > > > > > > are
>>> > > > > > > > > > > >> >> >> two
>>> > > > > > > > > > > >> >> >>>>> major
>>> > > > > > > > > > > >> >> >>>>>>>>> issues to consistency and integration,
>>> > namely
>>> > > > > > > > different
>>> > > > > > > > > > > >> >> >>>> deployment
>>> > > > > > > > > > > >> >> >>>>> of
>>> > > > > > > > > > > >> >> >>>>>>>>> job cluster which couples job graph
>>> > creation
>>> > > > > and
>>> > > > > > > > > cluster
>>> > > > > > > > > > > >> >> >>>>> deployment,
>>> > > > > > > > > > > >> >> >>>>>>>>> and submission via CliFrontend confusing
>>> > > > > control
>>> > > > > > flow
>>> > > > > > > > > of
>>> > > > > > > > > > > job
>>> > > > > > > > > > > >> >> >>>> graph
>>> > > > > > > > > > > >> >> >>>>>>>>> compilation and job submission. I'd like
>>> > to
>>> > > > > > follow
>>> > > > > > > > the
>>> > > > > > > > > > > >> >> >> discuss
>>> > > > > > > > > > > >> >> >>>>> above,
>>> > > > > > > > > > > >> >> >>>>>>>>> mainly the process described by Jeff and
>>> > > > > > Stephan, and
>>> > > > > > > > > > share
>>> > > > > > > > > > > >> >> >> my
>>> > > > > > > > > > > >> >> >>>>>>>>> ideas on these issues.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> 1) CliFrontend confuses the control flow
>>> > of
>>> > > > job
>>> > > > > > > > > > compilation
>>> > > > > > > > > > > >> >> >> and
>>> > > > > > > > > > > >> >> >>>>>>>> submission.
>>> > > > > > > > > > > >> >> >>>>>>>>> Following the process of job submission
>>> > > > Stephan
>>> > > > > > and
>>> > > > > > > > > Jeff
>>> > > > > > > > > > > >> >> >>>> described,
>>> > > > > > > > > > > >> >> >>>>>>>>> execution environment knows all configs
>>> > of
>>> > > > the
>>> > > > > > > > cluster
>>> > > > > > > > > > and
>>> > > > > > > > > > > >> >> >>>>>>> topos/settings
>>> > > > > > > > > > > >> >> >>>>>>>>> of the job. Ideally, in the main method
>>> > of
>>> > > > user
>>> > > > > > > > > program,
>>> > > > > > > > > > it
>>> > > > > > > > > > > >> >> >>> calls
>>> > > > > > > > > > > >> >> >>>>>>>> #execute
>>> > > > > > > > > > > >> >> >>>>>>>>> (or named #submit) and Flink deploys the
>>> > > > > cluster,
>>> > > > > > > > > compile
>>> > > > > > > > > > > the
>>> > > > > > > > > > > >> >> >>> job
>>> > > > > > > > > > > >> >> >>>>>> graph
>>> > > > > > > > > > > >> >> >>>>>>>>> and submit it to the cluster. However,
>>> > > > current
>>> > > > > > > > > > CliFrontend
>>> > > > > > > > > > > >> >> >> does
>>> > > > > > > > > > > >> >> >>>> all
>>> > > > > > > > > > > >> >> >>>>>>> these
>>> > > > > > > > > > > >> >> >>>>>>>>> things inside its #runProgram method,
>>> > which
>>> > > > > > > > introduces
>>> > > > > > > > > a
>>> > > > > > > > > > > lot
>>> > > > > > > > > > > >> >> >> of
>>> > > > > > > > > > > >> >> >>>>>>>> subclasses
>>> > > > > > > > > > > >> >> >>>>>>>>> of (stream) execution environment.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> Actually, it sets up an exec env that
>>> > hijacks
>>> > > > > the
>>> > > > > > > > > > > >> >> >>>>>> #execute/executePlan
>>> > > > > > > > > > > >> >> >>>>>>>>> method, initializes the job graph and
>>> > abort
>>> > > > > > > > execution.
>>> > > > > > > > > > And
>>> > > > > > > > > > > >> >> >> then
>>> > > > > > > > > > > >> >> >>>>>>>>> control flow back to CliFrontend, it
>>> > deploys
>>> > > > > the
>>> > > > > > > > > > cluster(or
>>> > > > > > > > > > > >> >> >>>>> retrieve
>>> > > > > > > > > > > >> >> >>>>>>>>> the client) and submits the job graph.
>>> > This
>>> > > > is
>>> > > > > > quite
>>> > > > > > > > a
>>> > > > > > > > > > > >> >> >> specific
>>> > > > > > > > > > > >> >> >>>>>>> internal
>>> > > > > > > > > > > >> >> >>>>>>>>> process inside Flink and none of
>>> > consistency
>>> > > > to
>>> > > > > > > > > anything.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> 2) Deployment of job cluster couples job
>>> > > > graph
>>> > > > > > > > creation
>>> > > > > > > > > > and
>>> > > > > > > > > > > >> >> >>>> cluster
>>> > > > > > > > > > > >> >> >>>>>>>>> deployment. Abstractly, from user job to
>>> > a
>>> > > > > > concrete
>>> > > > > > > > > > > >> >> >> submission,
>>> > > > > > > > > > > >> >> >>>> it
>>> > > > > > > > > > > >> >> >>>>>>>> requires
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>     create JobGraph --\
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> create ClusterClient -->  submit 
>>> > > > > > > > > > > >> >> >>>>>>>>> JobGraph
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> such a dependency. ClusterClient was
>>> > created
>>> > > > by
>>> > > > > > > > > deploying
>>> > > > > > > > > > > or
>>> > > > > > > > > > > >> >> >>>>>>> retrieving.
>>> > > > > > > > > > > >> >> >>>>>>>>> JobGraph submission requires a compiled
>>> > > > > JobGraph
>>> > > > > > and
>>> > > > > > > > > > valid
>>> > > > > > > > > > > >> >> >>>>>>> ClusterClient,
>>> > > > > > > > > > > >> >> >>>>>>>>> but the creation of ClusterClient is
>>> > > > abstractly
>>> > > > > > > > > > independent
>>> > > > > > > > > > > >> >> >> of
>>> > > > > > > > > > > >> >> >>>> that
>>> > > > > > > > > > > >> >> >>>>>> of
>>> > > > > > > > > > > >> >> >>>>>>>>> JobGraph. However, in job cluster mode,
>>> > we
>>> > > > > > deploy job
>>> > > > > > > > > > > cluster
>>> > > > > > > > > > > >> >> >>>> with
>>> > > > > > > > > > > >> >> >>>>> a
>>> > > > > > > > > > > >> >> >>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>> graph, which means we use another
>>> > process:
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> create JobGraph --> deploy cluster with
>>> > the
>>> > > > > > JobGraph
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> Here is another inconsistency and
>>> > downstream
>>> > > > > > > > > > > projects/client
>>> > > > > > > > > > > >> >> >>> apis
>>> > > > > > > > > > > >> >> >>>>> are
>>> > > > > > > > > > > >> >> >>>>>>>>> forced to handle different cases with
>>> > rare
>>> > > > > > supports
>>> > > > > > > > > from
>>> > > > > > > > > > > >> >> >> Flink.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> Since we likely reached a consensus on
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> 1. all configs gathered by Flink
>>> > > > configuration
>>> > > > > > and
>>> > > > > > > > > passed
>>> > > > > > > > > > > >> >> >>>>>>>>> 2. execution environment knows all
>>> > configs
>>> > > > and
>>> > > > > > > > handles
>>> > > > > > > > > > > >> >> >>>>> execution(both
>>> > > > > > > > > > > >> >> >>>>>>>>> deployment and submission)
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> to the issues above I propose 
>>> > > > > > > > > > > >> >> >>>>>>>>> eliminating
>>> > > > > > > > > inconsistencies
>>> > > > > > > > > > > by
>>> > > > > > > > > > > >> >> >>>>>> following
>>> > > > > > > > > > > >> >> >>>>>>>>> approach:
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> 1) CliFrontend should exactly be a front
>>> > end,
>>> > > > > at
>>> > > > > > > > least
>>> > > > > > > > > > for
>>> > > > > > > > > > > >> >> >>> "run"
>>> > > > > > > > > > > >> >> >>>>>>> command.
>>> > > > > > > > > > > >> >> >>>>>>>>> That means it just gathered and passed
>>> > all
>>> > > > > config
>>> > > > > > > > from
>>> > > > > > > > > > > >> >> >> command
>>> > > > > > > > > > > >> >> >>>> line
>>> > > > > > > > > > > >> >> >>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>> the main method of user program.
>>> > Execution
>>> > > > > > > > environment
>>> > > > > > > > > > > knows
>>> > > > > > > > > > > >> >> >>> all
>>> > > > > > > > > > > >> >> >>>>> the
>>> > > > > > > > > > > >> >> >>>>>>> info
>>> > > > > > > > > > > >> >> >>>>>>>>> and with an addition to utils for
>>> > > > > ClusterClient,
>>> > > > > > we
>>> > > > > > > > > > > >> >> >> gracefully
>>> > > > > > > > > > > >> >> >>>> get
>>> > > > > > > > > > > >> >> >>>>> a
>>> > > > > > > > > > > >> >> >>>>>>>>> ClusterClient by deploying or
>>> > retrieving. In
>>> > > > > this
>>> > > > > > > > way,
>>> > > > > > > > > we
>>> > > > > > > > > > > >> >> >> don't
>>> > > > > > > > > > > >> >> >>>>> need
>>> > > > > > > > > > > >> >> >>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>> hijack #execute/executePlan methods and
>>> > can
>>> > > > > > remove
>>> > > > > > > > > > various
>>> > > > > > > > > > > >> >> >>>> hacking
>>> > > > > > > > > > > >> >> >>>>>>>>> subclasses of exec env, as well as #run
>>> > > > methods
>>> > > > > > in
>>> > > > > > > > > > > >> >> >>>>> ClusterClient(for
>>> > > > > > > > > > > >> >> >>>>>> an
>>> > > > > > > > > > > >> >> >>>>>>>>> interface-ized ClusterClient). Now the
>>> > > > control
>>> > > > > > flow
>>> > > > > > > > > flows
>>> > > > > > > > > > > >> >> >> from
>>> > > > > > > > > > > >> >> >>>>>>>> CliFrontend
>>> > > > > > > > > > > >> >> >>>>>>>>> to the main method and never returns.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> 2) Job cluster means a cluster for the
>>> > > > specific
>>> > > > > > job.
>>> > > > > > > > > From
>>> > > > > > > > > > > >> >> >>> another
>>> > > > > > > > > > > >> >> >>>>>>>>> perspective, it is an ephemeral session.
>>> > We
>>> > > > may
>>> > > > > > > > > decouple
>>> > > > > > > > > > > the
>>> > > > > > > > > > > >> >> >>>>>> deployment
>>> > > > > > > > > > > >> >> >>>>>>>>> with a compiled job graph, but start a
>>> > > > session
>>> > > > > > with
>>> > > > > > > > > idle
>>> > > > > > > > > > > >> >> >>> timeout
>>> > > > > > > > > > > >> >> >>>>>>>>> and submit the job following.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> These topics, before we go into more
>>> > details
>>> > > > on
>>> > > > > > > > design
>>> > > > > > > > > or
>>> > > > > > > > > > > >> >> >>>>>>> implementation,
>>> > > > > > > > > > > >> >> >>>>>>>>> are better to be aware and discussed for
>>> > a
>>> > > > > > consensus.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> Best,
>>> > > > > > > > > > > >> >> >>>>>>>>> tison.
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>> Zili Chen <wander4...@gmail.com>
>>> > > > 于2019年6月20日周四
>>> > > > > > > > > 上午3:21写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> Hi Jeff,
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> Thanks for raising this thread and the
>>> > > > design
>>> > > > > > > > > document!
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> As @Thomas Weise mentioned above,
>>> > extending
>>> > > > > > config
>>> > > > > > > > to
>>> > > > > > > > > > > flink
>>> > > > > > > > > > > >> >> >>>>>>>>>> requires far more effort than it should
>>> > be.
>>> > > > > > Another
>>> > > > > > > > > > > example
>>> > > > > > > > > > > >> >> >>>>>>>>>> is we achieve detach mode by introduce
>>> > > > another
>>> > > > > > > > > execution
>>> > > > > > > > > > > >> >> >>>>>>>>>> environment which also hijack #execute
>>> > > > method.
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> I agree with your idea that user would
>>> > > > > > configure all
>>> > > > > > > > > > > things
>>> > > > > > > > > > > >> >> >>>>>>>>>> and flink "just" respect it. On this
>>> > topic I
>>> > > > > > think
>>> > > > > > > > the
>>> > > > > > > > > > > >> >> >> unusual
>>> > > > > > > > > > > >> >> >>>>>>>>>> control flow when CliFrontend handle
>>> > "run"
>>> > > > > > command
>>> > > > > > > > is
>>> > > > > > > > > > the
>>> > > > > > > > > > > >> >> >>>> problem.
>>> > > > > > > > > > > >> >> >>>>>>>>>> It handles several configs, mainly 
>>> > > > > > > > > > > >> >> >>>>>>>>>> about
>>> > > > > cluster
>>> > > > > > > > > > settings,
>>> > > > > > > > > > > >> >> >> and
>>> > > > > > > > > > > >> >> >>>>>>>>>> thus main method of user program is
>>> > unaware
>>> > > > of
>>> > > > > > them.
>>> > > > > > > > > > Also
>>> > > > > > > > > > > it
>>> > > > > > > > > > > >> >> >>>>>> compiles
>>> > > > > > > > > > > >> >> >>>>>>>>>> app to job graph by run the main method
>>> > > > with a
>>> > > > > > > > > hijacked
>>> > > > > > > > > > > exec
>>> > > > > > > > > > > >> >> >>>> env,
>>> > > > > > > > > > > >> >> >>>>>>>>>> which constrain the main method 
>>> > > > > > > > > > > >> >> >>>>>>>>>> further.
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> I'd like to write down a few of notes 
>>> > > > > > > > > > > >> >> >>>>>>>>>> on
>>> > > > > > > > configs/args
>>> > > > > > > > > > pass
>>> > > > > > > > > > > >> >> >> and
>>> > > > > > > > > > > >> >> >>>>>>> respect,
>>> > > > > > > > > > > >> >> >>>>>>>>>> as well as decoupling job compilation
>>> > and
>>> > > > > > > > submission.
>>> > > > > > > > > > > Share
>>> > > > > > > > > > > >> >> >> on
>>> > > > > > > > > > > >> >> >>>>> this
>>> > > > > > > > > > > >> >> >>>>>>>>>> thread later.
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> Best,
>>> > > > > > > > > > > >> >> >>>>>>>>>> tison.
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com>
>>> > > > > > 于2019年6月17日周一
>>> > > > > > > > > > > >> >> >> 下午7:29写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>> Hi Jeff and Flavio,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>> Thanks Jeff a lot for proposing the
>>> > design
>>> > > > > > > > document.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>> We are also working on refactoring
>>> > > > > > ClusterClient to
>>> > > > > > > > > > allow
>>> > > > > > > > > > > >> >> >>>>> flexible
>>> > > > > > > > > > > >> >> >>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>> efficient job management in our
>>> > real-time
>>> > > > > > platform.
>>> > > > > > > > > > > >> >> >>>>>>>>>>> We would like to draft a document to
>>> > share
>>> > > > > our
>>> > > > > > > > ideas
>>> > > > > > > > > > with
>>> > > > > > > > > > > >> >> >>> you.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>> I think it's a good idea to have
>>> > something
>>> > > > > like
>>> > > > > > > > > Apache
>>> > > > > > > > > > > Livy
>>> > > > > > > > > > > >> >> >>> for
>>> > > > > > > > > > > >> >> >>>>>>> Flink,
>>> > > > > > > > > > > >> >> >>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>> the efforts discussed here will take a
>>> > > > great
>>> > > > > > step
>>> > > > > > > > > > forward
>>> > > > > > > > > > > >> >> >> to
>>> > > > > > > > > > > >> >> >>>> it.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>> Regards,
>>> > > > > > > > > > > >> >> >>>>>>>>>>> Xiaogang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>> Flavio Pompermaier <
>>> > pomperma...@okkam.it>
>>> > > > > > > > > > 于2019年6月17日周一
>>> > > > > > > > > > > >> >> >>>>> 下午7:13写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> Is there any possibility to have
>>> > something
>>> > > > > > like
>>> > > > > > > > > Apache
>>> > > > > > > > > > > >> >> >> Livy
>>> > > > > > > > > > > >> >> >>>> [1]
>>> > > > > > > > > > > >> >> >>>>>>> also
>>> > > > > > > > > > > >> >> >>>>>>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> Flink in the future?
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> [1] https://livy.apache.org/
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> On Tue, Jun 11, 2019 at 5:23 PM Jeff
>>> > > > Zhang <
>>> > > > > > > > > > > >> >> >>> zjf...@gmail.com
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Any API we expose should not have
>>> > > > > > dependencies
>>> > > > > > > > > on
>>> > > > > > > > > > > >> >> >>> the
>>> > > > > > > > > > > >> >> >>>>>>> runtime
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> (flink-runtime) package or other
>>> > > > > > implementation
>>> > > > > > > > > > > >> >> >> details.
>>> > > > > > > > > > > >> >> >>> To
>>> > > > > > > > > > > >> >> >>>>> me,
>>> > > > > > > > > > > >> >> >>>>>>>> this
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> means
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> that the current ClusterClient
>>> > cannot be
>>> > > > > > exposed
>>> > > > > > > > to
>>> > > > > > > > > > > >> >> >> users
>>> > > > > > > > > > > >> >> >>>>>> because
>>> > > > > > > > > > > >> >> >>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> uses
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> quite some classes from the
>>> > optimiser and
>>> > > > > > runtime
>>> > > > > > > > > > > >> >> >>> packages.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> We should change ClusterClient from
>>> > class
>>> > > > > to
>>> > > > > > > > > > interface.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> ExecutionEnvironment only use the
>>> > > > interface
>>> > > > > > > > > > > >> >> >> ClusterClient
>>> > > > > > > > > > > >> >> >>>>> which
>>> > > > > > > > > > > >> >> >>>>>>>>>>> should be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> in flink-clients while the concrete
>>> > > > > > > > implementation
>>> > > > > > > > > > > >> >> >> class
>>> > > > > > > > > > > >> >> >>>>> could
>>> > > > > > > > > > > >> >> >>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> flink-runtime.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> What happens when a
>>> > failure/restart in
>>> > > > > the
>>> > > > > > > > > client
>>> > > > > > > > > > > >> >> >>>>> happens?
>>> > > > > > > > > > > >> >> >>>>>>>> There
>>> > > > > > > > > > > >> >> >>>>>>>>>>> need
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> to be a way of re-establishing the
>>> > > > > > connection to
>>> > > > > > > > > the
>>> > > > > > > > > > > >> >> >> job,
>>> > > > > > > > > > > >> >> >>>> set
>>> > > > > > > > > > > >> >> >>>>>> up
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> listeners again, etc.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> Good point.  First we need to define
>>> > what
>>> > > > > > does
>>> > > > > > > > > > > >> >> >>>>> failure/restart
>>> > > > > > > > > > > >> >> >>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> client mean. IIUC, that usually mean
>>> > > > > network
>>> > > > > > > > > failure
>>> > > > > > > > > > > >> >> >>> which
>>> > > > > > > > > > > >> >> >>>>> will
>>> > > > > > > > > > > >> >> >>>>>>>>>>> happen in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> class RestClient. If my
>>> > understanding is
>>> > > > > > correct,
>>> > > > > > > > > > > >> >> >>>>> restart/retry
>>> > > > > > > > > > > >> >> >>>>>>>>>>> mechanism
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> should be done in RestClient.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> Aljoscha Krettek <
>>> > aljos...@apache.org>
>>> > > > > > > > > 于2019年6月11日周二
>>> > > > > > > > > > > >> >> >>>>>> 下午11:10写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> Some points to consider:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> * Any API we expose should not have
>>> > > > > > dependencies
>>> > > > > > > > > on
>>> > > > > > > > > > > >> >> >> the
>>> > > > > > > > > > > >> >> >>>>>> runtime
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> (flink-runtime) package or other
>>> > > > > > implementation
>>> > > > > > > > > > > >> >> >>> details.
>>> > > > > > > > > > > >> >> >>>> To
>>> > > > > > > > > > > >> >> >>>>>> me,
>>> > > > > > > > > > > >> >> >>>>>>>>>>> this
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> means
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> that the current ClusterClient
>>> > cannot be
>>> > > > > > exposed
>>> > > > > > > > > to
>>> > > > > > > > > > > >> >> >>> users
>>> > > > > > > > > > > >> >> >>>>>>> because
>>> > > > > > > > > > > >> >> >>>>>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> uses
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> quite some classes from the
>>> > optimiser
>>> > > > and
>>> > > > > > > > runtime
>>> > > > > > > > > > > >> >> >>>> packages.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> * What happens when a
>>> > failure/restart in
>>> > > > > the
>>> > > > > > > > > client
>>> > > > > > > > > > > >> >> >>>>> happens?
>>> > > > > > > > > > > >> >> >>>>>>>> There
>>> > > > > > > > > > > >> >> >>>>>>>>>>> need
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> be a way of re-establishing the
>>> > > > connection
>>> > > > > > to
>>> > > > > > > > the
>>> > > > > > > > > > > >> >> >> job,
>>> > > > > > > > > > > >> >> >>>> set
>>> > > > > > > > > > > >> >> >>>>> up
>>> > > > > > > > > > > >> >> >>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> listeners
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> again, etc.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> Aljoscha
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> On 29. May 2019, at 10:17, Jeff
>>> > Zhang <
>>> > > > > > > > > > > >> >> >>>> zjf...@gmail.com>
>>> > > > > > > > > > > >> >> >>>>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Sorry folks, the design doc is
>>> > late as
>>> > > > > you
>>> > > > > > > > > > > >> >> >> expected.
>>> > > > > > > > > > > >> >> >>>>> Here's
>>> > > > > > > > > > > >> >> >>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> design
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> doc
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> I drafted, welcome any comments 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> and
>>> > > > > > feedback.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Stephan Ewen <se...@apache.org>
>>> > > > > > 于2019年2月14日周四
>>> > > > > > > > > > > >> >> >>>> 下午8:43写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Nice that this discussion is
>>> > > > happening.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> In the FLIP, we could also
>>> > revisit the
>>> > > > > > entire
>>> > > > > > > > > role
>>> > > > > > > > > > > >> >> >>> of
>>> > > > > > > > > > > >> >> >>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> environments
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> again.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Initially, the idea was:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - the environments take care of
>>> > the
>>> > > > > > specific
>>> > > > > > > > > > > >> >> >> setup
>>> > > > > > > > > > > >> >> >>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>> standalone
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> (no
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> setup needed), yarn, mesos, etc.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - the session ones have control
>>> > over
>>> > > > the
>>> > > > > > > > > session.
>>> > > > > > > > > > > >> >> >>> The
>>> > > > > > > > > > > >> >> >>>>>>>>>>> environment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> holds
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the session client.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - running a job gives a "control"
>>> > > > object
>>> > > > > > for
>>> > > > > > > > > that
>>> > > > > > > > > > > >> >> >>>> job.
>>> > > > > > > > > > > >> >> >>>>>> That
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> behavior
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> is
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the same in all environments.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> The actual implementation 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> diverged
>>> > > > quite
>>> > > > > > a bit
>>> > > > > > > > > > > >> >> >> from
>>> > > > > > > > > > > >> >> >>>>> that.
>>> > > > > > > > > > > >> >> >>>>>>>> Happy
>>> > > > > > > > > > > >> >> >>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> see a
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> discussion about straitening this
>>> > out
>>> > > > a
>>> > > > > > bit
>>> > > > > > > > > more.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> On Tue, Feb 12, 2019 at 4:58 AM
>>> > Jeff
>>> > > > > > Zhang <
>>> > > > > > > > > > > >> >> >>>>>>> zjf...@gmail.com>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Hi folks,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Sorry for late response, It
>>> > seems we
>>> > > > > > reach
>>> > > > > > > > > > > >> >> >>> consensus
>>> > > > > > > > > > > >> >> >>>> on
>>> > > > > > > > > > > >> >> >>>>>>>> this, I
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> will
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> create
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> FLIP for this with more detailed
>>> > > > design
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Thomas Weise <t...@apache.org>
>>> > > > > > 于2018年12月21日周五
>>> > > > > > > > > > > >> >> >>>>> 上午11:43写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Great to see this discussion
>>> > seeded!
>>> > > > > The
>>> > > > > > > > > > > >> >> >> problems
>>> > > > > > > > > > > >> >> >>>> you
>>> > > > > > > > > > > >> >> >>>>>> face
>>> > > > > > > > > > > >> >> >>>>>>>>>>> with
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Zeppelin integration are also
>>> > > > > affecting
>>> > > > > > > > other
>>> > > > > > > > > > > >> >> >>>>> downstream
>>> > > > > > > > > > > >> >> >>>>>>>>>>> projects,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> like
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Beam.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> We just enabled the savepoint
>>> > > > restore
>>> > > > > > option
>>> > > > > > > > > in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>> RemoteStreamEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> [1]
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and that was more difficult
>>> > than it
>>> > > > > > should
>>> > > > > > > > be.
>>> > > > > > > > > > > >> >> >> The
>>> > > > > > > > > > > >> >> >>>>> main
>>> > > > > > > > > > > >> >> >>>>>>>> issue
>>> > > > > > > > > > > >> >> >>>>>>>>>>> is
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> that
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> environment and cluster client
>>> > > > aren't
>>> > > > > > > > > decoupled.
>>> > > > > > > > > > > >> >> >>>>> Ideally
>>> > > > > > > > > > > >> >> >>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>> should
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> possible to just get the
>>> > matching
>>> > > > > > cluster
>>> > > > > > > > > client
>>> > > > > > > > > > > >> >> >>>> from
>>> > > > > > > > > > > >> >> >>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> environment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> then control the job through it
>>> > > > > > (environment
>>> > > > > > > > > as
>>> > > > > > > > > > > >> >> >>>>> factory
>>> > > > > > > > > > > >> >> >>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> cluster
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> client). But note that the
>>> > > > environment
>>> > > > > > > > classes
>>> > > > > > > > > > > >> >> >> are
>>> > > > > > > > > > > >> >> >>>>> part
>>> > > > > > > > > > > >> >> >>>>>> of
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> public
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> API,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and it is not straightforward 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> to
>>> > > > make
>>> > > > > > larger
>>> > > > > > > > > > > >> >> >>> changes
>>> > > > > > > > > > > >> >> >>>>>>> without
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> breaking
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> backward compatibility.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterClient currently exposes
>>> > > > > internal
>>> > > > > > > > > classes
>>> > > > > > > > > > > >> >> >>>> like
>>> > > > > > > > > > > >> >> >>>>>>>>>>> JobGraph and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> StreamGraph. But it should be
>>> > > > possible
>>> > > > > > to
>>> > > > > > > > wrap
>>> > > > > > > > > > > >> >> >>> this
>>> > > > > > > > > > > >> >> >>>>>> with a
>>> > > > > > > > > > > >> >> >>>>>>>> new
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> public
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> API
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> that brings the required job
>>> > control
>>> > > > > > > > > > > >> >> >> capabilities
>>> > > > > > > > > > > >> >> >>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>> downstream
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> projects.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Perhaps it is helpful to look 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> at
>>> > > > some
>>> > > > > > of the
>>> > > > > > > > > > > >> >> >>>>> interfaces
>>> > > > > > > > > > > >> >> >>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>>>>> Beam
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> while
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> thinking about this: [2] for 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
>>> > > > > > portable
>>> > > > > > > > job
>>> > > > > > > > > > > >> >> >> API
>>> > > > > > > > > > > >> >> >>>> and
>>> > > > > > > > > > > >> >> >>>>>> [3]
>>> > > > > > > > > > > >> >> >>>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> old
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> asynchronous job control from
>>> > the
>>> > > > Beam
>>> > > > > > Java
>>> > > > > > > > > SDK.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> The backward compatibility
>>> > > > discussion
>>> > > > > > [4] is
>>> > > > > > > > > > > >> >> >> also
>>> > > > > > > > > > > >> >> >>>>>> relevant
>>> > > > > > > > > > > >> >> >>>>>>>>>>> here. A
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> new
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> API
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> should shield downstream
>>> > projects
>>> > > > from
>>> > > > > > > > > internals
>>> > > > > > > > > > > >> >> >>> and
>>> > > > > > > > > > > >> >> >>>>>> allow
>>> > > > > > > > > > > >> >> >>>>>>>>>>> them to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> interoperate with multiple
>>> > future
>>> > > > > Flink
>>> > > > > > > > > versions
>>> > > > > > > > > > > >> >> >>> in
>>> > > > > > > > > > > >> >> >>>>> the
>>> > > > > > > > > > > >> >> >>>>>>> same
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> release
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> line
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> without forced upgrades.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thanks,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thomas
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [1]
>>> > > > > > > > https://github.com/apache/flink/pull/7249
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [2]
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [3]
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [4]
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> On Thu, Dec 20, 2018 at 6:15 PM
>>> > Jeff
>>> > > > > > Zhang <
>>> > > > > > > > > > > >> >> >>>>>>>> zjf...@gmail.com>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'm not so sure whether the
>>> > user
>>> > > > > > should
>>> > > > > > > > be
>>> > > > > > > > > > > >> >> >>> able
>>> > > > > > > > > > > >> >> >>>> to
>>> > > > > > > > > > > >> >> >>>>>>>> define
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> where
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> runs (in your example Yarn).
>>> > This
>>> > > > is
>>> > > > > > > > actually
>>> > > > > > > > > > > >> >> >>>>>> independent
>>> > > > > > > > > > > >> >> >>>>>>>> of
>>> > > > > > > > > > > >> >> >>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> development and is something
>>> > which
>>> > > > is
>>> > > > > > > > decided
>>> > > > > > > > > > > >> >> >> at
>>> > > > > > > > > > > >> >> >>>>>>> deployment
>>> > > > > > > > > > > >> >> >>>>>>>>>>> time.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> User don't need to specify
>>> > > > execution
>>> > > > > > mode
>>> > > > > > > > > > > >> >> >>>>>>> programmatically.
>>> > > > > > > > > > > >> >> >>>>>>>>>>> They
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> can
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> also
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass the execution mode from
>>> > the
>>> > > > > > arguments
>>> > > > > > > > in
>>> > > > > > > > > > > >> >> >>> flink
>>> > > > > > > > > > > >> >> >>>>> run
>>> > > > > > > > > > > >> >> >>>>>>>>>>> command.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> e.g.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m yarn-cluster
>>> > ....
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m local ...
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m host:port ...
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Does this make sense to you ?
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> To me it makes sense that
>>> > the
>>> > > > > > > > > > > >> >> >>>> ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>> is
>>> > > > > > > > > > > >> >> >>>>>>>> not
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> directly
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> initialized by the user and
>>> > instead
>>> > > > > > context
>>> > > > > > > > > > > >> >> >>>> sensitive
>>> > > > > > > > > > > >> >> >>>>>> how
>>> > > > > > > > > > > >> >> >>>>>>>> you
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> want
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> execute your job (Flink CLI 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> vs.
>>> > > > IDE,
>>> > > > > > for
>>> > > > > > > > > > > >> >> >>> example).
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Right, currently I notice 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink
>>> > > > would
>>> > > > > > > > create
>>> > > > > > > > > > > >> >> >>>>> different
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ContextExecutionEnvironment
>>> > based
>>> > > > on
>>> > > > > > > > > different
>>> > > > > > > > > > > >> >> >>>>>> submission
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> scenarios
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> (Flink
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Cli vs IDE). To me this is
>>> > kind of
>>> > > > > hack
>>> > > > > > > > > > > >> >> >> approach,
>>> > > > > > > > > > > >> >> >>>> not
>>> > > > > > > > > > > >> >> >>>>>> so
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> straightforward.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> What I suggested above is that
>>> > is
>>> > > > > that
>>> > > > > > > > flink
>>> > > > > > > > > > > >> >> >>> should
>>> > > > > > > > > > > >> >> >>>>>>> always
>>> > > > > > > > > > > >> >> >>>>>>>>>>> create
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> same
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ExecutionEnvironment but with
>>> > > > > different
>>> > > > > > > > > > > >> >> >>>>> configuration,
>>> > > > > > > > > > > >> >> >>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>> based
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> on
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> configuration it would create
>>> > the
>>> > > > > > proper
>>> > > > > > > > > > > >> >> >>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> different
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> behaviors.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Till Rohrmann <
>>> > > > trohrm...@apache.org>
>>> > > > > > > > > > > >> >> >>>> 于2018年12月20日周四
>>> > > > > > > > > > > >> >> >>>>>>>>>>> 下午11:18写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> You are probably right that 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> we
>>> > > > have
>>> > > > > > code
>>> > > > > > > > > > > >> >> >>>> duplication
>>> > > > > > > > > > > >> >> >>>>>>> when
>>> > > > > > > > > > > >> >> >>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> comes
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creation of the 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient.
>>> > > > This
>>> > > > > > should
>>> > > > > > > > > be
>>> > > > > > > > > > > >> >> >>>>> reduced
>>> > > > > > > > > > > >> >> >>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> future.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I'm not so sure whether the
>>> > user
>>> > > > > > should be
>>> > > > > > > > > > > >> >> >> able
>>> > > > > > > > > > > >> >> >>> to
>>> > > > > > > > > > > >> >> >>>>>>> define
>>> > > > > > > > > > > >> >> >>>>>>>>>>> where
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> runs (in your example Yarn).
>>> > This
>>> > > > is
>>> > > > > > > > > actually
>>> > > > > > > > > > > >> >> >>>>>>> independent
>>> > > > > > > > > > > >> >> >>>>>>>>>>> of the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> development and is something
>>> > which
>>> > > > > is
>>> > > > > > > > > decided
>>> > > > > > > > > > > >> >> >> at
>>> > > > > > > > > > > >> >> >>>>>>>> deployment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> time.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> To
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> me
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> makes sense that the
>>> > > > > > ExecutionEnvironment
>>> > > > > > > > is
>>> > > > > > > > > > > >> >> >> not
>>> > > > > > > > > > > >> >> >>>>>>> directly
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> initialized
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> by
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> the user and instead context
>>> > > > > > sensitive how
>>> > > > > > > > > you
>>> > > > > > > > > > > >> >> >>>> want
>>> > > > > > > > > > > >> >> >>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>> execute
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> your
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> (Flink CLI vs. IDE, for
>>> > example).
>>> > > > > > > > However, I
>>> > > > > > > > > > > >> >> >>> agree
>>> > > > > > > > > > > >> >> >>>>>> that
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ExecutionEnvironment should
>>> > give
>>> > > > you
>>> > > > > > > > access
>>> > > > > > > > > to
>>> > > > > > > > > > > >> >> >>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job (maybe in the form of the
>>> > > > > > JobGraph or
>>> > > > > > > > a
>>> > > > > > > > > > > >> >> >> job
>>> > > > > > > > > > > >> >> >>>>> plan).
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Cheers,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Till
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 4:36
>>> > AM
>>> > > > Jeff
>>> > > > > > > > Zhang <
>>> > > > > > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi Till,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. You
>>> > are
>>> > > > > > right
>>> > > > > > > > > that I
>>> > > > > > > > > > > >> >> >>>>> expect
>>> > > > > > > > > > > >> >> >>>>>>>> better
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job submission/control api
>>> > which
>>> > > > > > could be
>>> > > > > > > > > > > >> >> >> used
>>> > > > > > > > > > > >> >> >>> by
>>> > > > > > > > > > > >> >> >>>>>>>>>>> downstream
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> project.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> And
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it would benefit for the
>>> > flink
>>> > > > > > ecosystem.
>>> > > > > > > > > > > >> >> >> When
>>> > > > > > > > > > > >> >> >>> I
>>> > > > > > > > > > > >> >> >>>>> look
>>> > > > > > > > > > > >> >> >>>>>>> at
>>> > > > > > > > > > > >> >> >>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> code
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> of
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> flink
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> scala-shell and sql-client 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> (I
>>> > > > > believe
>>> > > > > > > > they
>>> > > > > > > > > > > >> >> >> are
>>> > > > > > > > > > > >> >> >>>> not
>>> > > > > > > > > > > >> >> >>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>> core of
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> flink,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> but
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> belong to the ecosystem of
>>> > > > flink),
>>> > > > > I
>>> > > > > > find
>>> > > > > > > > > > > >> >> >> many
>>> > > > > > > > > > > >> >> >>>>>>> duplicated
>>> > > > > > > > > > > >> >> >>>>>>>>>>> code
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creating
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ClusterClient from user
>>> > provided
>>> > > > > > > > > > > >> >> >> configuration
>>> > > > > > > > > > > >> >> >>>>>>>>>>> (configuration
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> format
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> may
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> different from scala-shell
>>> > and
>>> > > > > > > > sql-client)
>>> > > > > > > > > > > >> >> >> and
>>> > > > > > > > > > > >> >> >>>> then
>>> > > > > > > > > > > >> >> >>>>>> use
>>> > > > > > > > > > > >> >> >>>>>>>>>>> that
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to manipulate jobs. I don't
>>> > think
>>> > > > > > this is
>>> > > > > > > > > > > >> >> >>>>> convenient
>>> > > > > > > > > > > >> >> >>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> downstream
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> projects. What I expect is
>>> > that
>>> > > > > > > > downstream
>>> > > > > > > > > > > >> >> >>>> project
>>> > > > > > > > > > > >> >> >>>>>> only
>>> > > > > > > > > > > >> >> >>>>>>>>>>> needs
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> provide
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> necessary configuration info
>>> > > > (maybe
>>> > > > > > > > > > > >> >> >> introducing
>>> > > > > > > > > > > >> >> >>>>> class
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> FlinkConf),
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> build ExecutionEnvironment
>>> > based
>>> > > > on
>>> > > > > > this
>>> > > > > > > > > > > >> >> >>>> FlinkConf,
>>> > > > > > > > > > > >> >> >>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will
>>> > create
>>> > > > > the
>>> > > > > > > > proper
>>> > > > > > > > > > > >> >> >>>>>>>> ClusterClient.
>>> > > > > > > > > > > >> >> >>>>>>>>>>> It
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> not
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> only
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> benefit for the downstream
>>> > > > project
>>> > > > > > > > > > > >> >> >> development
>>> > > > > > > > > > > >> >> >>>> but
>>> > > > > > > > > > > >> >> >>>>>> also
>>> > > > > > > > > > > >> >> >>>>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> helpful
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> their integration test with
>>> > > > flink.
>>> > > > > > Here's
>>> > > > > > > > > one
>>> > > > > > > > > > > >> >> >>>>> sample
>>> > > > > > > > > > > >> >> >>>>>>> code
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> snippet
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> that
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> I
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> expect.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val conf = new
>>> > > > > > FlinkConf().mode("yarn")
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val env = new
>>> > > > > > ExecutionEnvironment(conf)
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobId = env.submit(...)
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobStatus =
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > env.getClusterClient().queryJobStatus(jobId)
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > env.getClusterClient().cancelJob(jobId)
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> What do you think ?
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Till Rohrmann <
>>> > > > > trohrm...@apache.org>
>>> > > > > > > > > > > >> >> >>>>> 于2018年12月11日周二
>>> > > > > > > > > > > >> >> >>>>>>>>>>> 下午6:28写道:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi Jeff,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> what you are proposing is 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> to
>>> > > > > > provide the
>>> > > > > > > > > > > >> >> >> user
>>> > > > > > > > > > > >> >> >>>> with
>>> > > > > > > > > > > >> >> >>>>>>>> better
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> control. There was actually
>>> > an
>>> > > > > > effort to
>>> > > > > > > > > > > >> >> >>> achieve
>>> > > > > > > > > > > >> >> >>>>>> this
>>> > > > > > > > > > > >> >> >>>>>>>> but
>>> > > > > > > > > > > >> >> >>>>>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> has
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> never
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> been
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> completed [1]. However,
>>> > there
>>> > > > are
>>> > > > > > some
>>> > > > > > > > > > > >> >> >>>> improvement
>>> > > > > > > > > > > >> >> >>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>> code
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> base
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> now.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Look for example at the
>>> > > > > > NewClusterClient
>>> > > > > > > > > > > >> >> >>>> interface
>>> > > > > > > > > > > >> >> >>>>>>> which
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> offers a
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> non-blocking job 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> submission.
>>> > > > But I
>>> > > > > > agree
>>> > > > > > > > > > > >> >> >> that
>>> > > > > > > > > > > >> >> >>> we
>>> > > > > > > > > > > >> >> >>>>>> need
>>> > > > > > > > > > > >> >> >>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> improve
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> this regard.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I would not be in favour if
>>> > > > > > exposing all
>>> > > > > > > > > > > >> >> >>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>> calls
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> via
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment
>>> > because it
>>> > > > > > would
>>> > > > > > > > > > > >> >> >> clutter
>>> > > > > > > > > > > >> >> >>>> the
>>> > > > > > > > > > > >> >> >>>>>>> class
>>> > > > > > > > > > > >> >> >>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> would
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> not
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> a
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> good separation of 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> concerns.
>>> > > > > > Instead one
>>> > > > > > > > > > > >> >> >> idea
>>> > > > > > > > > > > >> >> >>>>> could
>>> > > > > > > > > > > >> >> >>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> retrieve
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> current ClusterClient from
>>> > the
>>> > > > > > > > > > > >> >> >>>>> ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>> which
>>> > > > > > > > > > > >> >> >>>>>>>>>>> can
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> used
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> for cluster and job
>>> > control. But
>>> > > > > > before
>>> > > > > > > > we
>>> > > > > > > > > > > >> >> >>> start
>>> > > > > > > > > > > >> >> >>>>> an
>>> > > > > > > > > > > >> >> >>>>>>>> effort
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> here,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> we
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> need
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> agree and capture what
>>> > > > > > functionality we
>>> > > > > > > > > want
>>> > > > > > > > > > > >> >> >>> to
>>> > > > > > > > > > > >> >> >>>>>>> provide.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Initially, the idea was
>>> > that we
>>> > > > > > have the
>>> > > > > > > > > > > >> >> >>>>>>>> ClusterDescriptor
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> describing
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> how
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> to talk to cluster manager
>>> > like
>>> > > > > > Yarn or
>>> > > > > > > > > > > >> >> >> Mesos.
>>> > > > > > > > > > > >> >> >>>> The
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterDescriptor
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> can
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> be
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> used for deploying Flink
>>> > > > clusters
>>> > > > > > (job
>>> > > > > > > > and
>>> > > > > > > > > > > >> >> >>>>> session)
>>> > > > > > > > > > > >> >> >>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>> gives
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> you a
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ClusterClient. The
>>> > ClusterClient
>>> > > > > > > > controls
>>> > > > > > > > > > > >> >> >> the
>>> > > > > > > > > > > >> >> >>>>>> cluster
>>> > > > > > > > > > > >> >> >>>>>>>>>>> (e.g.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> submitting
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> jobs, listing all running
>>> > jobs).
>>> > > > > And
>>> > > > > > > > then
>>> > > > > > > > > > > >> >> >>> there
>>> > > > > > > > > > > >> >> >>>>> was
>>> > > > > > > > > > > >> >> >>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>> idea
>>> > > > > > > > > > > >> >> >>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> introduce a
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> JobClient which you obtain
>>> > from
>>> > > > > the
>>> > > > > > > > > > > >> >> >>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>> trigger
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> specific
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> operations (e.g. taking a
>>> > > > > savepoint,
>>> > > > > > > > > > > >> >> >>> cancelling
>>> > > > > > > > > > > >> >> >>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>> job).
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> [1]
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > https://issues.apache.org/jira/browse/FLINK-4272
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Cheers,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Till
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at
>>> > 10:13 AM
>>> > > > > > Jeff
>>> > > > > > > > > Zhang
>>> > > > > > > > > > > >> >> >> <
>>> > > > > > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> wrote:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Folks,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I am trying to integrate
>>> > flink
>>> > > > > into
>>> > > > > > > > > apache
>>> > > > > > > > > > > >> >> >>>>> zeppelin
>>> > > > > > > > > > > >> >> >>>>>>>>>>> which is
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> an
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> interactive
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> notebook. And I hit 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> several
>>> > > > > issues
>>> > > > > > that
>>> > > > > > > > > is
>>> > > > > > > > > > > >> >> >>>> caused
>>> > > > > > > > > > > >> >> >>>>>> by
>>> > > > > > > > > > > >> >> >>>>>>>>>>> flink
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> client
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> api.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> So
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I'd like to proposal the
>>> > > > > following
>>> > > > > > > > > changes
>>> > > > > > > > > > > >> >> >>> for
>>> > > > > > > > > > > >> >> >>>>>> flink
>>> > > > > > > > > > > >> >> >>>>>>>>>>> client
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> api.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1. Support nonblocking
>>> > > > execution.
>>> > > > > > > > > > > >> >> >> Currently,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment#execute
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is a blocking method which
>>> > > > would
>>> > > > > > do 2
>>> > > > > > > > > > > >> >> >> things,
>>> > > > > > > > > > > >> >> >>>>> first
>>> > > > > > > > > > > >> >> >>>>>>>>>>> submit
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wait for job until it is
>>> > > > > finished.
>>> > > > > > I'd
>>> > > > > > > > > like
>>> > > > > > > > > > > >> >> >>>>>>> introduce a
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> nonblocking
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution method like
>>> > > > > > > > > > > >> >> >>>> ExecutionEnvironment#submit
>>> > > > > > > > > > > >> >> >>>>>>> which
>>> > > > > > > > > > > >> >> >>>>>>>>>>> only
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> submit
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> then return jobId to
>>> > client.
>>> > > > And
>>> > > > > > allow
>>> > > > > > > > > user
>>> > > > > > > > > > > >> >> >>> to
>>> > > > > > > > > > > >> >> >>>>>> query
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> status
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> via
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 2. Add cancel api in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >> ExecutionEnvironment/StreamExecutionEnvironment,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> currently the only way to
>>> > > > cancel
>>> > > > > > job is
>>> > > > > > > > > via
>>> > > > > > > > > > > >> >> >>> cli
>>> > > > > > > > > > > >> >> >>>>>>>>>>> (bin/flink),
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> this
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> is
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> not
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> convenient for downstream
>>> > > > project
>>> > > > > > to
>>> > > > > > > > use
>>> > > > > > > > > > > >> >> >> this
>>> > > > > > > > > > > >> >> >>>>>>> feature.
>>> > > > > > > > > > > >> >> >>>>>>>>>>> So I'd
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> like
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> add
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cancel api in
>>> > > > > ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 3. Add savepoint api in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>
>>> > ExecutionEnvironment/StreamExecutionEnvironment.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> It
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is similar as cancel api,
>>> > we
>>> > > > > > should use
>>> > > > > > > > > > > >> >> >>>>>>>>>>> ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> as
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> unified
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> api for third party to
>>> > > > integrate
>>> > > > > > with
>>> > > > > > > > > > > >> >> >> flink.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 4. Add listener for job
>>> > > > execution
>>> > > > > > > > > > > >> >> >> lifecycle.
>>> > > > > > > > > > > >> >> >>>>>>> Something
>>> > > > > > > > > > > >> >> >>>>>>>>>>> like
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> following,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> so
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that downstream project
>>> > can do
>>> > > > > > custom
>>> > > > > > > > > logic
>>> > > > > > > > > > > >> >> >>> in
>>> > > > > > > > > > > >> >> >>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>> lifecycle
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> of
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> job.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> e.g.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Zeppelin would capture the
>>> > > > jobId
>>> > > > > > after
>>> > > > > > > > > job
>>> > > > > > > > > > > >> >> >> is
>>> > > > > > > > > > > >> >> >>>>>>> submitted
>>> > > > > > > > > > > >> >> >>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> use
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId to cancel it later
>>> > when
>>> > > > > > > > necessary.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> public interface
>>> > JobListener {
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobSubmitted(JobID
>>> > > > > jobId);
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void
>>> > > > > > onJobExecuted(JobExecutionResult
>>> > > > > > > > > > > >> >> >>>>> jobResult);
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobCanceled(JobID
>>> > > > jobId);
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> }
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 5. Enable session in
>>> > > > > > > > > ExecutionEnvironment.
>>> > > > > > > > > > > >> >> >>>>>> Currently
>>> > > > > > > > > > > >> >> >>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>> is
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> disabled,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> but
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> session is very convenient
>>> > for
>>> > > > > > third
>>> > > > > > > > > party
>>> > > > > > > > > > > >> >> >> to
>>> > > > > > > > > > > >> >> >>>>>>>> submitting
>>> > > > > > > > > > > >> >> >>>>>>>>>>> jobs
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> continually.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I hope flink can enable it
>>> > > > again.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6. Unify all flink client
>>> > api
>>> > > > > into
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > ExecutionEnvironment/StreamExecutionEnvironment.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a long term issue
>>> > which
>>> > > > > > needs
>>> > > > > > > > > more
>>> > > > > > > > > > > >> >> >>>>> careful
>>> > > > > > > > > > > >> >> >>>>>>>>>>> thinking
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> design.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Currently some of features
>>> > of
>>> > > > > > flink is
>>> > > > > > > > > > > >> >> >>> exposed
>>> > > > > > > > > > > >> >> >>>> in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > ExecutionEnvironment/StreamExecutionEnvironment,
>>> > > > > > > > > > > >> >> >>>>>> but
>>> > > > > > > > > > > >> >> >>>>>>>>>>> some are
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> exposed
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> in
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cli instead of api, like
>>> > the
>>> > > > > > cancel and
>>> > > > > > > > > > > >> >> >>>>> savepoint I
>>> > > > > > > > > > > >> >> >>>>>>>>>>> mentioned
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> above.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> think the root cause is
>>> > due to
>>> > > > > that
>>> > > > > > > > flink
>>> > > > > > > > > > > >> >> >>>> didn't
>>> > > > > > > > > > > >> >> >>>>>>> unify
>>> > > > > > > > > > > >> >> >>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> interaction
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> with
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink. Here I list 3
>>> > scenarios
>>> > > > of
>>> > > > > > flink
>>> > > > > > > > > > > >> >> >>>> operation
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Local job execution.
>>> > Flink
>>> > > > > will
>>> > > > > > > > > create
>>> > > > > > > > > > > >> >> >>>>>>>>>>> LocalEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> use
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this LocalEnvironment to
>>> > > > create
>>> > > > > > > > > > > >> >> >>> LocalExecutor
>>> > > > > > > > > > > >> >> >>>>> for
>>> > > > > > > > > > > >> >> >>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> execution.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Remote job execution.
>>> > Flink
>>> > > > > will
>>> > > > > > > > > create
>>> > > > > > > > > > > >> >> >>>>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> first
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  create ContextEnvironment
>>> > > > based
>>> > > > > > on the
>>> > > > > > > > > > > >> >> >>>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> run
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> job.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Job cancelation. Flink
>>> > will
>>> > > > > > create
>>> > > > > > > > > > > >> >> >>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>> first
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> then
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> cancel
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this job via this
>>> > > > ClusterClient.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> As you can see in the
>>> > above 3
>>> > > > > > > > scenarios.
>>> > > > > > > > > > > >> >> >>> Flink
>>> > > > > > > > > > > >> >> >>>>>> didn't
>>> > > > > > > > > > > >> >> >>>>>>>>>>> use the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> same
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> approach(code path) to
>>> > interact
>>> > > > > > with
>>> > > > > > > > > flink
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> What I propose is
>>> > following:
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Create the proper
>>> > > > > > > > > > > >> >> >>>>>> LocalEnvironment/RemoteEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>> (based
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> on
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> user
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration) --> Use 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> this
>>> > > > > > Environment
>>> > > > > > > > > to
>>> > > > > > > > > > > >> >> >>>> create
>>> > > > > > > > > > > >> >> >>>>>>>> proper
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (LocalClusterClient or
>>> > > > > > > > RestClusterClient)
>>> > > > > > > > > > > >> >> >> to
>>> > > > > > > > > > > >> >> >>>>>>>> interactive
>>> > > > > > > > > > > >> >> >>>>>>>>>>> with
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink (
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution or cancelation)
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This way we can unify the
>>> > > > process
>>> > > > > > of
>>> > > > > > > > > local
>>> > > > > > > > > > > >> >> >>>>>> execution
>>> > > > > > > > > > > >> >> >>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> remote
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> execution.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> And it is much easier for
>>> > third
>>> > > > > > party
>>> > > > > > > > to
>>> > > > > > > > > > > >> >> >>>>> integrate
>>> > > > > > > > > > > >> >> >>>>>>> with
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> flink,
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> because
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment is 
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the
>>> > > > > unified
>>> > > > > > > > entry
>>> > > > > > > > > > > >> >> >>> point
>>> > > > > > > > > > > >> >> >>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>> flink.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> What
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> third
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> party
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> needs to do is just pass
>>> > > > > > configuration
>>> > > > > > > > to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will
>>> > do
>>> > > > the
>>> > > > > > right
>>> > > > > > > > > > > >> >> >> thing
>>> > > > > > > > > > > >> >> >>>>> based
>>> > > > > > > > > > > >> >> >>>>>> on
>>> > > > > > > > > > > >> >> >>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> configuration.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Flink cli can also be
>>> > > > considered
>>> > > > > as
>>> > > > > > > > flink
>>> > > > > > > > > > > >> >> >> api
>>> > > > > > > > > > > >> >> >>>>>>> consumer.
>>> > > > > > > > > > > >> >> >>>>>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> just
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration to
>>> > > > > > ExecutionEnvironment
>>> > > > > > > > and
>>> > > > > > > > > > > >> >> >> let
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ExecutionEnvironment
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> create the proper
>>> > ClusterClient
>>> > > > > > instead
>>> > > > > > > > > of
>>> > > > > > > > > > > >> >> >>>>> letting
>>> > > > > > > > > > > >> >> >>>>>>> cli
>>> > > > > > > > > > > >> >> >>>>>>>> to
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> create
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ClusterClient directly.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6 would involve large code
>>> > > > > > refactoring,
>>> > > > > > > > > so
>>> > > > > > > > > > > >> >> >> I
>>> > > > > > > > > > > >> >> >>>>> think
>>> > > > > > > > > > > >> >> >>>>>> we
>>> > > > > > > > > > > >> >> >>>>>>>> can
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> defer
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> it
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> for
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> future release, 1,2,3,4,5
>>> > could
>>> > > > > be
>>> > > > > > done
>>> > > > > > > > > at
>>> > > > > > > > > > > >> >> >>>> once I
>>> > > > > > > > > > > >> >> >>>>>>>>>>> believe.
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Let
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> me
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> know
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> your
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> comments and feedback,
>>> > thanks
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> --
>>> > > > > > > > > > > >> >> >>>>>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>>
>>> > > > > > > > > > > >> >> >>>>>>
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>> --
>>> > > > > > > > > > > >> >> >>>>> Best Regards
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>> Jeff Zhang
>>> > > > > > > > > > > >> >> >>>>>
>>> > > > > > > > > > > >> >> >>>>
>>> > > > > > > > > > > >> >> >>>
>>> > > > > > > > > > > >> >> >>
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> > --
>>> > > > > > > > > > > >> >> > Best Regards
>>> > > > > > > > > > > >> >> >
>>> > > > > > > > > > > >> >> > Jeff Zhang
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >> >>
>>> > > > > > > > > > > >>
>>> > > > > > > > > > > >
>>> > > > > > > > > > > >
>>> > > > > > > > > > >
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> >
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang

Reply via email to