Hi Till,

Thanks for your update. Nice to hear :-)

Best,
tison.


Till Rohrmann <trohrm...@apache.org> 于2019年8月23日周五 下午10:39写道:

> Hi Tison,
>
> just a quick comment concerning the class loading issues when using the per
> job mode. The community wants to change it so that the
> StandaloneJobClusterEntryPoint actually uses the user code class loader
> with child first class loading [1]. Hence, I hope that this problem will be
> resolved soon.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13840
>
> Cheers,
> Till
>
> On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas <kklou...@gmail.com> wrote:
>
> > Hi all,
> >
> > On the topic of web submission, I agree with Till that it only seems
> > to complicate things.
> > It is bad for security, job isolation (anybody can submit/cancel jobs),
> > and its
> > implementation complicates some parts of the code. So, if it were to
> > redesign the
> > WebUI, maybe this part could be left out. In addition, I would say
> > that the ability to cancel
> > jobs could also be left out.
> >
> > Also I would also be in favour of removing the "detached" mode, for
> > the reasons mentioned
> > above (i.e. because now we will have a future representing the result
> > on which the user
> > can choose to wait or not).
> >
> > Now for the separating job submission and cluster creation, I am in
> > favour of keeping both.
> > Once again, the reasons are mentioned above by Stephan, Till, Aljoscha
> > and also Zili seems
> > to agree. They mainly have to do with security, isolation and ease of
> > resource management
> > for the user as he knows that "when my job is done, everything will be
> > cleared up". This is
> > also the experience you get when launching a process on your local OS.
> >
> > On excluding the per-job mode from returning a JobClient or not, I
> > believe that eventually
> > it would be nice to allow users to get back a jobClient. The reason is
> > that 1) I cannot
> > find any objective reason why the user-experience should diverge, and
> > 2) this will be the
> > way that the user will be able to interact with his running job.
> > Assuming that the necessary
> > ports are open for the REST API to work, then I think that the
> > JobClient can run against the
> > REST API without problems. If the needed ports are not open, then we
> > are safe to not return
> > a JobClient, as the user explicitly chose to close all points of
> > communication to his running job.
> >
> > On the topic of not hijacking the "env.execute()" in order to get the
> > Plan, I definitely agree but
> > for the proposal of having a "compile()" method in the env, I would
> > like to have a better look at
> > the existing code.
> >
> > Cheers,
> > Kostas
> >
> > On Fri, Aug 23, 2019 at 5:52 AM Zili Chen <wander4...@gmail.com> wrote:
> > >
> > > Hi Yang,
> > >
> > > It would be helpful if you check Stephan's last comment,
> > > which states that isolation is important.
> > >
> > > For per-job mode, we run a dedicated cluster(maybe it
> > > should have been a couple of JM and TMs during FLIP-6
> > > design) for a specific job. Thus the process is prevented
> > > from other jobs.
> > >
> > > In our cases there was a time we suffered from multi
> > > jobs submitted by different users and they affected
> > > each other so that all ran into an error state. Also,
> > > run the client inside the cluster could save client
> > > resource at some points.
> > >
> > > However, we also face several issues as you mentioned,
> > > that in per-job mode it always uses parent classloader
> > > thus classloading issues occur.
> > >
> > > BTW, one can makes an analogy between session/per-job mode
> > > in  Flink, and client/cluster mode in Spark.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Yang Wang <danrtsey...@gmail.com> 于2019年8月22日周四 上午11:25写道:
> > >
> > > > From the user's perspective, it is really confused about the scope of
> > > > per-job cluster.
> > > >
> > > >
> > > > If it means a flink cluster with single job, so that we could get
> > better
> > > > isolation.
> > > >
> > > > Now it does not matter how we deploy the cluster, directly
> > deploy(mode1)
> > > >
> > > > or start a flink cluster and then submit job through cluster
> > client(mode2).
> > > >
> > > >
> > > > Otherwise, if it just means directly deploy, how should we name the
> > mode2,
> > > >
> > > > session with job or something else?
> > > >
> > > > We could also benefit from the mode2. Users could get the same
> > isolation
> > > > with mode1.
> > > >
> > > > The user code and dependencies will be loaded by user class loader
> > > >
> > > > to avoid class conflict with framework.
> > > >
> > > >
> > > >
> > > > Anyway, both of the two submission modes are useful.
> > > >
> > > > We just need to clarify the concepts.
> > > >
> > > >
> > > >
> > > >
> > > > Best,
> > > >
> > > > Yang
> > > >
> > > > Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午5:58写道:
> > > >
> > > > > Thanks for the clarification.
> > > > >
> > > > > The idea JobDeployer ever came into my mind when I was muddled with
> > > > > how to execute per-job mode and session mode with the same user
> code
> > > > > and framework codepath.
> > > > >
> > > > > With the concept JobDeployer we back to the statement that
> > environment
> > > > > knows every configs of cluster deployment and job submission. We
> > > > > configure or generate from configuration a specific JobDeployer in
> > > > > environment and then code align on
> > > > >
> > > > > *JobClient client = env.execute().get();*
> > > > >
> > > > > which in session mode returned by clusterClient.submitJob and in
> > per-job
> > > > > mode returned by clusterDescriptor.deployJobCluster.
> > > > >
> > > > > Here comes a problem that currently we directly run
> ClusterEntrypoint
> > > > > with extracted job graph. Follow the JobDeployer way we'd better
> > > > > align entry point of per-job deployment at JobDeployer. Users run
> > > > > their main method or by a Cli(finally call main method) to deploy
> the
> > > > > job cluster.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > Stephan Ewen <se...@apache.org> 于2019年8月20日周二 下午4:40写道:
> > > > >
> > > > > > Till has made some good comments here.
> > > > > >
> > > > > > Two things to add:
> > > > > >
> > > > > >   - The job mode is very nice in the way that it runs the client
> > inside
> > > > > the
> > > > > > cluster (in the same image/process that is the JM) and thus
> unifies
> > > > both
> > > > > > applications and what the Spark world calls the "driver mode".
> > > > > >
> > > > > >   - Another thing I would add is that during the FLIP-6 design,
> we
> > were
> > > > > > thinking about setups where Dispatcher and JobManager are
> separate
> > > > > > processes.
> > > > > >     A Yarn or Mesos Dispatcher of a session could run
> independently
> > > > (even
> > > > > > as privileged processes executing no code).
> > > > > >     Then you the "per-job" mode could still be helpful: when a
> job
> > is
> > > > > > submitted to the dispatcher, it launches the JM again in a
> per-job
> > > > mode,
> > > > > so
> > > > > > that JM and TM processes are bound to teh job only. For higher
> > security
> > > > > > setups, it is important that processes are not reused across
> jobs.
> > > > > >
> > > > > > On Tue, Aug 20, 2019 at 10:27 AM Till Rohrmann <
> > trohrm...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > I would not be in favour of getting rid of the per-job mode
> > since it
> > > > > > > simplifies the process of running Flink jobs considerably.
> > Moreover,
> > > > it
> > > > > > is
> > > > > > > not only well suited for container deployments but also for
> > > > deployments
> > > > > > > where you want to guarantee job isolation. For example, a user
> > could
> > > > > use
> > > > > > > the per-job mode on Yarn to execute his job on a separate
> > cluster.
> > > > > > >
> > > > > > > I think that having two notions of cluster deployments (session
> > vs.
> > > > > > per-job
> > > > > > > mode) does not necessarily contradict your ideas for the client
> > api
> > > > > > > refactoring. For example one could have the following
> interfaces:
> > > > > > >
> > > > > > > - ClusterDeploymentDescriptor: encapsulates the logic how to
> > deploy a
> > > > > > > cluster.
> > > > > > > - ClusterClient: allows to interact with a cluster
> > > > > > > - JobClient: allows to interact with a running job
> > > > > > >
> > > > > > > Now the ClusterDeploymentDescriptor could have two methods:
> > > > > > >
> > > > > > > - ClusterClient deploySessionCluster()
> > > > > > > - JobClusterClient/JobClient deployPerJobCluster(JobGraph)
> > > > > > >
> > > > > > > where JobClusterClient is either a supertype of ClusterClient
> > which
> > > > > does
> > > > > > > not give you the functionality to submit jobs or
> > deployPerJobCluster
> > > > > > > returns directly a JobClient.
> > > > > > >
> > > > > > > When setting up the ExecutionEnvironment, one would then not
> > provide
> > > > a
> > > > > > > ClusterClient to submit jobs but a JobDeployer which, depending
> > on
> > > > the
> > > > > > > selected mode, either uses a ClusterClient (session mode) to
> > submit
> > > > > jobs
> > > > > > or
> > > > > > > a ClusterDeploymentDescriptor to deploy per a job mode cluster
> > with
> > > > the
> > > > > > job
> > > > > > > to execute.
> > > > > > >
> > > > > > > These are just some thoughts how one could make it working
> > because I
> > > > > > > believe there is some value in using the per job mode from the
> > > > > > > ExecutionEnvironment.
> > > > > > >
> > > > > > > Concerning the web submission, this is indeed a bit tricky.
> From
> > a
> > > > > > cluster
> > > > > > > management stand point, I would in favour of not executing user
> > code
> > > > on
> > > > > > the
> > > > > > > REST endpoint. Especially when considering security, it would
> be
> > good
> > > > > to
> > > > > > > have a well defined cluster behaviour where it is explicitly
> > stated
> > > > > where
> > > > > > > user code and, thus, potentially risky code is executed.
> Ideally
> > we
> > > > > limit
> > > > > > > it to the TaskExecutor and JobMaster.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Tue, Aug 20, 2019 at 9:40 AM Flavio Pompermaier <
> > > > > pomperma...@okkam.it
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > In my opinion the client should not use any environment to
> get
> > the
> > > > > Job
> > > > > > > > graph because the jar should reside ONLY on the cluster (and
> > not in
> > > > > the
> > > > > > > > client classpath otherwise there are always inconsistencies
> > between
> > > > > > > client
> > > > > > > > and Flink Job manager's classpath).
> > > > > > > > In the YARN, Mesos and Kubernetes scenarios you have the jar
> > but
> > > > you
> > > > > > > could
> > > > > > > > start a cluster that has the jar on the Job Manager as well
> > (but
> > > > this
> > > > > > is
> > > > > > > > the only case where I think you can assume that the client
> has
> > the
> > > > > jar
> > > > > > on
> > > > > > > > the classpath..in the REST job submission you don't have any
> > > > > > classpath).
> > > > > > > >
> > > > > > > > Thus, always in my opinion, the JobGraph should be generated
> > by the
> > > > > Job
> > > > > > > > Manager REST API.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Aug 20, 2019 at 9:00 AM Zili Chen <
> > wander4...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> I would like to involve Till & Stephan here to clarify some
> > > > concept
> > > > > of
> > > > > > > >> per-job mode.
> > > > > > > >>
> > > > > > > >> The term per-job is one of modes a cluster could run on. It
> is
> > > > > mainly
> > > > > > > >> aimed
> > > > > > > >> at spawn
> > > > > > > >> a dedicated cluster for a specific job while the job could
> be
> > > > > packaged
> > > > > > > >> with
> > > > > > > >> Flink
> > > > > > > >> itself and thus the cluster initialized with job so that get
> > rid
> > > > of
> > > > > a
> > > > > > > >> separated
> > > > > > > >> submission step.
> > > > > > > >>
> > > > > > > >> This is useful for container deployments where one create
> his
> > > > image
> > > > > > with
> > > > > > > >> the job
> > > > > > > >> and then simply deploy the container.
> > > > > > > >>
> > > > > > > >> However, it is out of client scope since a
> > client(ClusterClient
> > > > for
> > > > > > > >> example) is for
> > > > > > > >> communicate with an existing cluster and performance
> actions.
> > > > > > Currently,
> > > > > > > >> in
> > > > > > > >> per-job
> > > > > > > >> mode, we extract the job graph and bundle it into cluster
> > > > deployment
> > > > > > and
> > > > > > > >> thus no
> > > > > > > >> concept of client get involved. It looks like reasonable to
> > > > exclude
> > > > > > the
> > > > > > > >> deployment
> > > > > > > >> of per-job cluster from client api and use dedicated utility
> > > > > > > >> classes(deployers) for
> > > > > > > >> deployment.
> > > > > > > >>
> > > > > > > >> Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午12:37写道:
> > > > > > > >>
> > > > > > > >> > Hi Aljoscha,
> > > > > > > >> >
> > > > > > > >> > Thanks for your reply and participance. The Google Doc you
> > > > linked
> > > > > to
> > > > > > > >> > requires
> > > > > > > >> > permission and I think you could use a share link instead.
> > > > > > > >> >
> > > > > > > >> > I agree with that we almost reach a consensus that
> > JobClient is
> > > > > > > >> necessary
> > > > > > > >> > to
> > > > > > > >> > interacte with a running Job.
> > > > > > > >> >
> > > > > > > >> > Let me check your open questions one by one.
> > > > > > > >> >
> > > > > > > >> > 1. Separate cluster creation and job submission for
> per-job
> > > > mode.
> > > > > > > >> >
> > > > > > > >> > As you mentioned here is where the opinions diverge. In my
> > > > > document
> > > > > > > >> there
> > > > > > > >> > is
> > > > > > > >> > an alternative[2] that proposes excluding per-job
> deployment
> > > > from
> > > > > > > client
> > > > > > > >> > api
> > > > > > > >> > scope and now I find it is more reasonable we do the
> > exclusion.
> > > > > > > >> >
> > > > > > > >> > When in per-job mode, a dedicated JobCluster is launched
> to
> > > > > execute
> > > > > > > the
> > > > > > > >> > specific job. It is like a Flink Application more than a
> > > > > submission
> > > > > > > >> > of Flink Job. Client only takes care of job submission and
> > > > assume
> > > > > > > there
> > > > > > > >> is
> > > > > > > >> > an existing cluster. In this way we are able to consider
> > per-job
> > > > > > > issues
> > > > > > > >> > individually and JobClusterEntrypoint would be the utility
> > class
> > > > > for
> > > > > > > >> > per-job
> > > > > > > >> > deployment.
> > > > > > > >> >
> > > > > > > >> > Nevertheless, user program works in both session mode and
> > > > per-job
> > > > > > mode
> > > > > > > >> > without
> > > > > > > >> > necessary to change code. JobClient in per-job mode is
> > returned
> > > > > from
> > > > > > > >> > env.execute as normal. However, it would be no longer a
> > wrapper
> > > > of
> > > > > > > >> > RestClusterClient but a wrapper of PerJobClusterClient
> which
> > > > > > > >> communicates
> > > > > > > >> > to Dispatcher locally.
> > > > > > > >> >
> > > > > > > >> > 2. How to deal with plan preview.
> > > > > > > >> >
> > > > > > > >> > With env.compile functions users can get JobGraph or
> > FlinkPlan
> > > > and
> > > > > > > thus
> > > > > > > >> > they can preview the plan with programming. Typically it
> > looks
> > > > > like
> > > > > > > >> >
> > > > > > > >> > if (preview configured) {
> > > > > > > >> >     FlinkPlan plan = env.compile();
> > > > > > > >> >     new JSONDumpGenerator(...).dump(plan);
> > > > > > > >> > } else {
> > > > > > > >> >     env.execute();
> > > > > > > >> > }
> > > > > > > >> >
> > > > > > > >> > And `flink info` would be invalid any more.
> > > > > > > >> >
> > > > > > > >> > 3. How to deal with Jar Submission at the Web Frontend.
> > > > > > > >> >
> > > > > > > >> > There is one more thread talked on this topic[1]. Apart
> from
> > > > > > removing
> > > > > > > >> > the functions there are two alternatives.
> > > > > > > >> >
> > > > > > > >> > One is to introduce an interface has a method returns
> > > > > > > JobGraph/FilnkPlan
> > > > > > > >> > and Jar Submission only support main-class implements this
> > > > > > interface.
> > > > > > > >> > And then extract the JobGraph/FlinkPlan just by calling
> the
> > > > > method.
> > > > > > > >> > In this way, it is even possible to consider a separation
> > of job
> > > > > > > >> creation
> > > > > > > >> > and job submission.
> > > > > > > >> >
> > > > > > > >> > The other is, as you mentioned, let execute() do the
> actual
> > > > > > execution.
> > > > > > > >> > We won't execute the main method in the WebFrontend but
> > spawn a
> > > > > > > process
> > > > > > > >> > at WebMonitor side to execute. For return part we could
> > generate
> > > > > the
> > > > > > > >> > JobID from WebMonitor and pass it to the execution
> > environemnt.
> > > > > > > >> >
> > > > > > > >> > 4. How to deal with detached mode.
> > > > > > > >> >
> > > > > > > >> > I think detached mode is a temporary solution for
> > non-blocking
> > > > > > > >> submission.
> > > > > > > >> > In my document both submission and execution return a
> > > > > > > CompletableFuture
> > > > > > > >> and
> > > > > > > >> > users control whether or not wait for the result. In this
> > point
> > > > we
> > > > > > > don't
> > > > > > > >> > need a detached option but the functionality is covered.
> > > > > > > >> >
> > > > > > > >> > 5. How does per-job mode interact with interactive
> > programming.
> > > > > > > >> >
> > > > > > > >> > All of YARN, Mesos and Kubernetes scenarios follow the
> > pattern
> > > > > > launch
> > > > > > > a
> > > > > > > >> > JobCluster now. And I don't think there would be
> > inconsistency
> > > > > > between
> > > > > > > >> > different resource management.
> > > > > > > >> >
> > > > > > > >> > Best,
> > > > > > > >> > tison.
> > > > > > > >> >
> > > > > > > >> > [1]
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/x/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
> > > > > > > >> > [2]
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?disco=AAAADZaGGfs
> > > > > > > >> >
> > > > > > > >> > Aljoscha Krettek <aljos...@apache.org> 于2019年8月16日周五
> > 下午9:20写道:
> > > > > > > >> >
> > > > > > > >> >> Hi,
> > > > > > > >> >>
> > > > > > > >> >> I read both Jeffs initial design document and the newer
> > > > document
> > > > > by
> > > > > > > >> >> Tison. I also finally found the time to collect our
> > thoughts on
> > > > > the
> > > > > > > >> issue,
> > > > > > > >> >> I had quite some discussions with Kostas and this is the
> > > > result:
> > > > > > [1].
> > > > > > > >> >>
> > > > > > > >> >> I think overall we agree that this part of the code is in
> > dire
> > > > > need
> > > > > > > of
> > > > > > > >> >> some refactoring/improvements but I think there are still
> > some
> > > > > open
> > > > > > > >> >> questions and some differences in opinion what those
> > > > refactorings
> > > > > > > >> should
> > > > > > > >> >> look like.
> > > > > > > >> >>
> > > > > > > >> >> I think the API-side is quite clear, i.e. we need some
> > > > JobClient
> > > > > > API
> > > > > > > >> that
> > > > > > > >> >> allows interacting with a running Job. It could be
> > worthwhile
> > > > to
> > > > > > spin
> > > > > > > >> that
> > > > > > > >> >> off into a separate FLIP because we can probably find
> > consensus
> > > > > on
> > > > > > > that
> > > > > > > >> >> part more easily.
> > > > > > > >> >>
> > > > > > > >> >> For the rest, the main open questions from our doc are
> > these:
> > > > > > > >> >>
> > > > > > > >> >>   - Do we want to separate cluster creation and job
> > submission
> > > > > for
> > > > > > > >> >> per-job mode? In the past, there were conscious efforts
> to
> > > > *not*
> > > > > > > >> separate
> > > > > > > >> >> job submission from cluster creation for per-job clusters
> > for
> > > > > > Mesos,
> > > > > > > >> YARN,
> > > > > > > >> >> Kubernets (see StandaloneJobClusterEntryPoint). Tison
> > suggests
> > > > in
> > > > > > his
> > > > > > > >> >> design document to decouple this in order to unify job
> > > > > submission.
> > > > > > > >> >>
> > > > > > > >> >>   - How to deal with plan preview, which needs to hijack
> > > > > execute()
> > > > > > > and
> > > > > > > >> >> let the outside code catch an exception?
> > > > > > > >> >>
> > > > > > > >> >>   - How to deal with Jar Submission at the Web Frontend,
> > which
> > > > > > needs
> > > > > > > to
> > > > > > > >> >> hijack execute() and let the outside code catch an
> > exception?
> > > > > > > >> >> CliFrontend.run() “hijacks”
> ExecutionEnvironment.execute()
> > to
> > > > > get a
> > > > > > > >> >> JobGraph and then execute that JobGraph manually. We
> could
> > get
> > > > > > around
> > > > > > > >> that
> > > > > > > >> >> by letting execute() do the actual execution. One caveat
> > for
> > > > this
> > > > > > is
> > > > > > > >> that
> > > > > > > >> >> now the main() method doesn’t return (or is forced to
> > return by
> > > > > > > >> throwing an
> > > > > > > >> >> exception from execute()) which means that for Jar
> > Submission
> > > > > from
> > > > > > > the
> > > > > > > >> >> WebFrontend we have a long-running main() method running
> > in the
> > > > > > > >> >> WebFrontend. This doesn’t sound very good. We could get
> > around
> > > > > this
> > > > > > > by
> > > > > > > >> >> removing the plan preview feature and by removing Jar
> > > > > > > >> Submission/Running.
> > > > > > > >> >>
> > > > > > > >> >>   - How to deal with detached mode? Right now,
> > > > > DetachedEnvironment
> > > > > > > will
> > > > > > > >> >> execute the job and return immediately. If users control
> > when
> > > > > they
> > > > > > > >> want to
> > > > > > > >> >> return, by waiting on the job completion future, how do
> we
> > deal
> > > > > > with
> > > > > > > >> this?
> > > > > > > >> >> Do we simply remove the distinction between
> > > > > detached/non-detached?
> > > > > > > >> >>
> > > > > > > >> >>   - How does per-job mode interact with “interactive
> > > > programming”
> > > > > > > >> >> (FLIP-36). For YARN, each execute() call could spawn a
> new
> > > > Flink
> > > > > > YARN
> > > > > > > >> >> cluster. What about Mesos and Kubernetes?
> > > > > > > >> >>
> > > > > > > >> >> The first open question is where the opinions diverge, I
> > think.
> > > > > The
> > > > > > > >> rest
> > > > > > > >> >> are just open questions and interesting things that we
> > need to
> > > > > > > >> consider.
> > > > > > > >> >>
> > > > > > > >> >> Best,
> > > > > > > >> >> Aljoscha
> > > > > > > >> >>
> > > > > > > >> >> [1]
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> > > > > > > >> >> <
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix
> > > > > > > >> >> >
> > > > > > > >> >>
> > > > > > > >> >> > On 31. Jul 2019, at 15:23, Jeff Zhang <
> zjf...@gmail.com>
> > > > > wrote:
> > > > > > > >> >> >
> > > > > > > >> >> > Thanks tison for the effort. I left a few comments.
> > > > > > > >> >> >
> > > > > > > >> >> >
> > > > > > > >> >> > Zili Chen <wander4...@gmail.com> 于2019年7月31日周三
> 下午8:24写道:
> > > > > > > >> >> >
> > > > > > > >> >> >> Hi Flavio,
> > > > > > > >> >> >>
> > > > > > > >> >> >> Thanks for your reply.
> > > > > > > >> >> >>
> > > > > > > >> >> >> Either current impl and in the design, ClusterClient
> > > > > > > >> >> >> never takes responsibility for generating JobGraph.
> > > > > > > >> >> >> (what you see in current codebase is several class
> > methods)
> > > > > > > >> >> >>
> > > > > > > >> >> >> Instead, user describes his program in the main method
> > > > > > > >> >> >> with ExecutionEnvironment apis and calls env.compile()
> > > > > > > >> >> >> or env.optimize() to get FlinkPlan and JobGraph
> > > > respectively.
> > > > > > > >> >> >>
> > > > > > > >> >> >> For listing main classes in a jar and choose one for
> > > > > > > >> >> >> submission, you're now able to customize a CLI to do
> it.
> > > > > > > >> >> >> Specifically, the path of jar is passed as arguments
> and
> > > > > > > >> >> >> in the customized CLI you list main classes, choose
> one
> > > > > > > >> >> >> to submit to the cluster.
> > > > > > > >> >> >>
> > > > > > > >> >> >> Best,
> > > > > > > >> >> >> tison.
> > > > > > > >> >> >>
> > > > > > > >> >> >>
> > > > > > > >> >> >> Flavio Pompermaier <pomperma...@okkam.it>
> 于2019年7月31日周三
> > > > > > 下午8:12写道:
> > > > > > > >> >> >>
> > > > > > > >> >> >>> Just one note on my side: it is not clear to me
> > whether the
> > > > > > > client
> > > > > > > >> >> needs
> > > > > > > >> >> >> to
> > > > > > > >> >> >>> be able to generate a job graph or not.
> > > > > > > >> >> >>> In my opinion, the job jar must resides only on the
> > > > > > > >> server/jobManager
> > > > > > > >> >> >> side
> > > > > > > >> >> >>> and the client requires a way to get the job graph.
> > > > > > > >> >> >>> If you really want to access to the job graph, I'd
> add
> > a
> > > > > > > dedicated
> > > > > > > >> >> method
> > > > > > > >> >> >>> on the ClusterClient. like:
> > > > > > > >> >> >>>
> > > > > > > >> >> >>>   - getJobGraph(jarId, mainClass): JobGraph
> > > > > > > >> >> >>>   - listMainClasses(jarId): List<String>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>> These would require some addition also on the job
> > manager
> > > > > > > endpoint
> > > > > > > >> as
> > > > > > > >> >> >>> well..what do you think?
> > > > > > > >> >> >>>
> > > > > > > >> >> >>> On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <
> > > > > > wander4...@gmail.com
> > > > > > > >
> > > > > > > >> >> wrote:
> > > > > > > >> >> >>>
> > > > > > > >> >> >>>> Hi all,
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> Here is a document[1] on client api enhancement from
> > our
> > > > > > > >> perspective.
> > > > > > > >> >> >>>> We have investigated current implementations. And we
> > > > propose
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> 1. Unify the implementation of cluster deployment
> and
> > job
> > > > > > > >> submission
> > > > > > > >> >> in
> > > > > > > >> >> >>>> Flink.
> > > > > > > >> >> >>>> 2. Provide programmatic interfaces to allow flexible
> > job
> > > > and
> > > > > > > >> cluster
> > > > > > > >> >> >>>> management.
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> The first proposal is aimed at reducing code paths
> of
> > > > > cluster
> > > > > > > >> >> >> deployment
> > > > > > > >> >> >>>> and
> > > > > > > >> >> >>>> job submission so that one can adopt Flink in his
> > usage
> > > > > > easily.
> > > > > > > >> The
> > > > > > > >> >> >>> second
> > > > > > > >> >> >>>> proposal is aimed at providing rich interfaces for
> > > > advanced
> > > > > > > users
> > > > > > > >> >> >>>> who want to make accurate control of these stages.
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> Quick reference on open questions:
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> 1. Exclude job cluster deployment from client side
> or
> > > > > redefine
> > > > > > > the
> > > > > > > >> >> >>> semantic
> > > > > > > >> >> >>>> of job cluster? Since it fits in a process quite
> > different
> > > > > > from
> > > > > > > >> >> session
> > > > > > > >> >> >>>> cluster deployment and job submission.
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> 2. Maintain the codepaths handling class
> > > > > > > o.a.f.api.common.Program
> > > > > > > >> or
> > > > > > > >> >> >>>> implement customized program handling logic by
> > customized
> > > > > > > >> >> CliFrontend?
> > > > > > > >> >> >>>> See also this thread[2] and the document[1].
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> 3. Expose ClusterClient as public api or just expose
> > api
> > > > in
> > > > > > > >> >> >>>> ExecutionEnvironment
> > > > > > > >> >> >>>> and delegate them to ClusterClient? Further, in
> > either way
> > > > > is
> > > > > > it
> > > > > > > >> >> worth
> > > > > > > >> >> >> to
> > > > > > > >> >> >>>> introduce a JobClient which is an encapsulation of
> > > > > > ClusterClient
> > > > > > > >> that
> > > > > > > >> >> >>>> associated to specific job?
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> Best,
> > > > > > > >> >> >>>> tison.
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> [1]
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
> > > > > > > >> >> >>>> [2]
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三
> 上午9:19写道:
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>>> Thanks Stephan, I will follow up this issue in next
> > few
> > > > > > weeks,
> > > > > > > >> and
> > > > > > > >> >> >> will
> > > > > > > >> >> >>>>> refine the design doc. We could discuss more
> details
> > > > after
> > > > > > 1.9
> > > > > > > >> >> >> release.
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>> Stephan Ewen <se...@apache.org> 于2019年7月24日周三
> > 上午12:58写道:
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>>> Hi all!
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>> This thread has stalled for a bit, which I assume
> > ist
> > > > > mostly
> > > > > > > >> due to
> > > > > > > >> >> >>> the
> > > > > > > >> >> >>>>>> Flink 1.9 feature freeze and release testing
> effort.
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>> I personally still recognize this issue as one
> > important
> > > > > to
> > > > > > be
> > > > > > > >> >> >>> solved.
> > > > > > > >> >> >>>>> I'd
> > > > > > > >> >> >>>>>> be happy to help resume this discussion soon
> (after
> > the
> > > > > 1.9
> > > > > > > >> >> >> release)
> > > > > > > >> >> >>>> and
> > > > > > > >> >> >>>>>> see if we can do some step towards this in Flink
> > 1.10.
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>> Best,
> > > > > > > >> >> >>>>>> Stephan
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>> On Mon, Jun 24, 2019 at 10:41 AM Flavio
> Pompermaier
> > <
> > > > > > > >> >> >>>>> pomperma...@okkam.it>
> > > > > > > >> >> >>>>>> wrote:
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>>> That's exactly what I suggested a long time ago:
> > the
> > > > > Flink
> > > > > > > REST
> > > > > > > >> >> >>>> client
> > > > > > > >> >> >>>>>>> should not require any Flink dependency, only
> http
> > > > > library
> > > > > > to
> > > > > > > >> >> >> call
> > > > > > > >> >> >>>> the
> > > > > > > >> >> >>>>>> REST
> > > > > > > >> >> >>>>>>> services to submit and monitor a job.
> > > > > > > >> >> >>>>>>> What I suggested also in [1] was to have a way to
> > > > > > > automatically
> > > > > > > >> >> >>>> suggest
> > > > > > > >> >> >>>>>> the
> > > > > > > >> >> >>>>>>> user (via a UI) the available main classes and
> > their
> > > > > > required
> > > > > > > >> >> >>>>>>> parameters[2].
> > > > > > > >> >> >>>>>>> Another problem we have with Flink is that the
> Rest
> > > > > client
> > > > > > > and
> > > > > > > >> >> >> the
> > > > > > > >> >> >>>> CLI
> > > > > > > >> >> >>>>>> one
> > > > > > > >> >> >>>>>>> behaves differently and we use the CLI client
> (via
> > ssh)
> > > > > > > because
> > > > > > > >> >> >> it
> > > > > > > >> >> >>>>> allows
> > > > > > > >> >> >>>>>>> to call some other method after env.execute() [3]
> > (we
> > > > > have
> > > > > > to
> > > > > > > >> >> >> call
> > > > > > > >> >> >>>>>> another
> > > > > > > >> >> >>>>>>> REST service to signal the end of the job).
> > > > > > > >> >> >>>>>>> Int his regard, a dedicated interface, like the
> > > > > JobListener
> > > > > > > >> >> >>> suggested
> > > > > > > >> >> >>>>> in
> > > > > > > >> >> >>>>>>> the previous emails, would be very helpful
> (IMHO).
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>> [1]
> > https://issues.apache.org/jira/browse/FLINK-10864
> > > > > > > >> >> >>>>>>> [2]
> > https://issues.apache.org/jira/browse/FLINK-10862
> > > > > > > >> >> >>>>>>> [3]
> > https://issues.apache.org/jira/browse/FLINK-10879
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>> Best,
> > > > > > > >> >> >>>>>>> Flavio
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>> On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <
> > > > > > zjf...@gmail.com
> > > > > > > >
> > > > > > > >> >> >>> wrote:
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>>> Hi, Tison,
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>> Thanks for your comments. Overall I agree with
> you
> > > > that
> > > > > it
> > > > > > > is
> > > > > > > >> >> >>>>> difficult
> > > > > > > >> >> >>>>>>> for
> > > > > > > >> >> >>>>>>>> down stream project to integrate with flink and
> we
> > > > need
> > > > > to
> > > > > > > >> >> >>> refactor
> > > > > > > >> >> >>>>> the
> > > > > > > >> >> >>>>>>>> current flink client api.
> > > > > > > >> >> >>>>>>>> And I agree that CliFrontend should only parsing
> > > > command
> > > > > > > line
> > > > > > > >> >> >>>>> arguments
> > > > > > > >> >> >>>>>>> and
> > > > > > > >> >> >>>>>>>> then pass them to ExecutionEnvironment. It is
> > > > > > > >> >> >>>> ExecutionEnvironment's
> > > > > > > >> >> >>>>>>>> responsibility to compile job, create cluster,
> and
> > > > > submit
> > > > > > > job.
> > > > > > > >> >> >>>>> Besides
> > > > > > > >> >> >>>>>>>> that, Currently flink has many
> > ExecutionEnvironment
> > > > > > > >> >> >>>> implementations,
> > > > > > > >> >> >>>>>> and
> > > > > > > >> >> >>>>>>>> flink will use the specific one based on the
> > context.
> > > > > > IMHO,
> > > > > > > it
> > > > > > > >> >> >> is
> > > > > > > >> >> >>>> not
> > > > > > > >> >> >>>>>>>> necessary, ExecutionEnvironment should be able
> to
> > do
> > > > the
> > > > > > > right
> > > > > > > >> >> >>>> thing
> > > > > > > >> >> >>>>>>> based
> > > > > > > >> >> >>>>>>>> on the FlinkConf it is received. Too many
> > > > > > > ExecutionEnvironment
> > > > > > > >> >> >>>>>>>> implementation is another burden for downstream
> > > > project
> > > > > > > >> >> >>>> integration.
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>> One thing I'd like to mention is flink's scala
> > shell
> > > > and
> > > > > > sql
> > > > > > > >> >> >>>> client,
> > > > > > > >> >> >>>>>>>> although they are sub-modules of flink, they
> > could be
> > > > > > > treated
> > > > > > > >> >> >> as
> > > > > > > >> >> >>>>>>> downstream
> > > > > > > >> >> >>>>>>>> project which use flink's client api. Currently
> > you
> > > > will
> > > > > > > find
> > > > > > > >> >> >> it
> > > > > > > >> >> >>> is
> > > > > > > >> >> >>>>> not
> > > > > > > >> >> >>>>>>>> easy for them to integrate with flink, they
> share
> > many
> > > > > > > >> >> >> duplicated
> > > > > > > >> >> >>>>> code.
> > > > > > > >> >> >>>>>>> It
> > > > > > > >> >> >>>>>>>> is another sign that we should refactor flink
> > client
> > > > > api.
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>> I believe it is a large and hard change, and I
> am
> > > > afraid
> > > > > > we
> > > > > > > >> can
> > > > > > > >> >> >>> not
> > > > > > > >> >> >>>>>> keep
> > > > > > > >> >> >>>>>>>> compatibility since many of changes are user
> > facing.
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年6月24日周一
> > > > > 下午2:53写道:
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>>> Hi all,
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> After a closer look on our client apis, I can
> see
> > > > there
> > > > > > are
> > > > > > > >> >> >> two
> > > > > > > >> >> >>>>> major
> > > > > > > >> >> >>>>>>>>> issues to consistency and integration, namely
> > > > different
> > > > > > > >> >> >>>> deployment
> > > > > > > >> >> >>>>> of
> > > > > > > >> >> >>>>>>>>> job cluster which couples job graph creation
> and
> > > > > cluster
> > > > > > > >> >> >>>>> deployment,
> > > > > > > >> >> >>>>>>>>> and submission via CliFrontend confusing
> control
> > flow
> > > > > of
> > > > > > > job
> > > > > > > >> >> >>>> graph
> > > > > > > >> >> >>>>>>>>> compilation and job submission. I'd like to
> > follow
> > > > the
> > > > > > > >> >> >> discuss
> > > > > > > >> >> >>>>> above,
> > > > > > > >> >> >>>>>>>>> mainly the process described by Jeff and
> > Stephan, and
> > > > > > share
> > > > > > > >> >> >> my
> > > > > > > >> >> >>>>>>>>> ideas on these issues.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> 1) CliFrontend confuses the control flow of job
> > > > > > compilation
> > > > > > > >> >> >> and
> > > > > > > >> >> >>>>>>>> submission.
> > > > > > > >> >> >>>>>>>>> Following the process of job submission Stephan
> > and
> > > > > Jeff
> > > > > > > >> >> >>>> described,
> > > > > > > >> >> >>>>>>>>> execution environment knows all configs of the
> > > > cluster
> > > > > > and
> > > > > > > >> >> >>>>>>> topos/settings
> > > > > > > >> >> >>>>>>>>> of the job. Ideally, in the main method of user
> > > > > program,
> > > > > > it
> > > > > > > >> >> >>> calls
> > > > > > > >> >> >>>>>>>> #execute
> > > > > > > >> >> >>>>>>>>> (or named #submit) and Flink deploys the
> cluster,
> > > > > compile
> > > > > > > the
> > > > > > > >> >> >>> job
> > > > > > > >> >> >>>>>> graph
> > > > > > > >> >> >>>>>>>>> and submit it to the cluster. However, current
> > > > > > CliFrontend
> > > > > > > >> >> >> does
> > > > > > > >> >> >>>> all
> > > > > > > >> >> >>>>>>> these
> > > > > > > >> >> >>>>>>>>> things inside its #runProgram method, which
> > > > introduces
> > > > > a
> > > > > > > lot
> > > > > > > >> >> >> of
> > > > > > > >> >> >>>>>>>> subclasses
> > > > > > > >> >> >>>>>>>>> of (stream) execution environment.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> Actually, it sets up an exec env that hijacks
> the
> > > > > > > >> >> >>>>>> #execute/executePlan
> > > > > > > >> >> >>>>>>>>> method, initializes the job graph and abort
> > > > execution.
> > > > > > And
> > > > > > > >> >> >> then
> > > > > > > >> >> >>>>>>>>> control flow back to CliFrontend, it deploys
> the
> > > > > > cluster(or
> > > > > > > >> >> >>>>> retrieve
> > > > > > > >> >> >>>>>>>>> the client) and submits the job graph. This is
> > quite
> > > > a
> > > > > > > >> >> >> specific
> > > > > > > >> >> >>>>>>> internal
> > > > > > > >> >> >>>>>>>>> process inside Flink and none of consistency to
> > > > > anything.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> 2) Deployment of job cluster couples job graph
> > > > creation
> > > > > > and
> > > > > > > >> >> >>>> cluster
> > > > > > > >> >> >>>>>>>>> deployment. Abstractly, from user job to a
> > concrete
> > > > > > > >> >> >> submission,
> > > > > > > >> >> >>>> it
> > > > > > > >> >> >>>>>>>> requires
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>>     create JobGraph --\
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> create ClusterClient -->  submit JobGraph
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> such a dependency. ClusterClient was created by
> > > > > deploying
> > > > > > > or
> > > > > > > >> >> >>>>>>> retrieving.
> > > > > > > >> >> >>>>>>>>> JobGraph submission requires a compiled
> JobGraph
> > and
> > > > > > valid
> > > > > > > >> >> >>>>>>> ClusterClient,
> > > > > > > >> >> >>>>>>>>> but the creation of ClusterClient is abstractly
> > > > > > independent
> > > > > > > >> >> >> of
> > > > > > > >> >> >>>> that
> > > > > > > >> >> >>>>>> of
> > > > > > > >> >> >>>>>>>>> JobGraph. However, in job cluster mode, we
> > deploy job
> > > > > > > cluster
> > > > > > > >> >> >>>> with
> > > > > > > >> >> >>>>> a
> > > > > > > >> >> >>>>>>> job
> > > > > > > >> >> >>>>>>>>> graph, which means we use another process:
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> create JobGraph --> deploy cluster with the
> > JobGraph
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> Here is another inconsistency and downstream
> > > > > > > projects/client
> > > > > > > >> >> >>> apis
> > > > > > > >> >> >>>>> are
> > > > > > > >> >> >>>>>>>>> forced to handle different cases with rare
> > supports
> > > > > from
> > > > > > > >> >> >> Flink.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> Since we likely reached a consensus on
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> 1. all configs gathered by Flink configuration
> > and
> > > > > passed
> > > > > > > >> >> >>>>>>>>> 2. execution environment knows all configs and
> > > > handles
> > > > > > > >> >> >>>>> execution(both
> > > > > > > >> >> >>>>>>>>> deployment and submission)
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> to the issues above I propose eliminating
> > > > > inconsistencies
> > > > > > > by
> > > > > > > >> >> >>>>>> following
> > > > > > > >> >> >>>>>>>>> approach:
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> 1) CliFrontend should exactly be a front end,
> at
> > > > least
> > > > > > for
> > > > > > > >> >> >>> "run"
> > > > > > > >> >> >>>>>>> command.
> > > > > > > >> >> >>>>>>>>> That means it just gathered and passed all
> config
> > > > from
> > > > > > > >> >> >> command
> > > > > > > >> >> >>>> line
> > > > > > > >> >> >>>>>> to
> > > > > > > >> >> >>>>>>>>> the main method of user program. Execution
> > > > environment
> > > > > > > knows
> > > > > > > >> >> >>> all
> > > > > > > >> >> >>>>> the
> > > > > > > >> >> >>>>>>> info
> > > > > > > >> >> >>>>>>>>> and with an addition to utils for
> ClusterClient,
> > we
> > > > > > > >> >> >> gracefully
> > > > > > > >> >> >>>> get
> > > > > > > >> >> >>>>> a
> > > > > > > >> >> >>>>>>>>> ClusterClient by deploying or retrieving. In
> this
> > > > way,
> > > > > we
> > > > > > > >> >> >> don't
> > > > > > > >> >> >>>>> need
> > > > > > > >> >> >>>>>> to
> > > > > > > >> >> >>>>>>>>> hijack #execute/executePlan methods and can
> > remove
> > > > > > various
> > > > > > > >> >> >>>> hacking
> > > > > > > >> >> >>>>>>>>> subclasses of exec env, as well as #run methods
> > in
> > > > > > > >> >> >>>>> ClusterClient(for
> > > > > > > >> >> >>>>>> an
> > > > > > > >> >> >>>>>>>>> interface-ized ClusterClient). Now the control
> > flow
> > > > > flows
> > > > > > > >> >> >> from
> > > > > > > >> >> >>>>>>>> CliFrontend
> > > > > > > >> >> >>>>>>>>> to the main method and never returns.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> 2) Job cluster means a cluster for the specific
> > job.
> > > > > From
> > > > > > > >> >> >>> another
> > > > > > > >> >> >>>>>>>>> perspective, it is an ephemeral session. We may
> > > > > decouple
> > > > > > > the
> > > > > > > >> >> >>>>>> deployment
> > > > > > > >> >> >>>>>>>>> with a compiled job graph, but start a session
> > with
> > > > > idle
> > > > > > > >> >> >>> timeout
> > > > > > > >> >> >>>>>>>>> and submit the job following.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> These topics, before we go into more details on
> > > > design
> > > > > or
> > > > > > > >> >> >>>>>>> implementation,
> > > > > > > >> >> >>>>>>>>> are better to be aware and discussed for a
> > consensus.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> Best,
> > > > > > > >> >> >>>>>>>>> tison.
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年6月20日周四
> > > > > 上午3:21写道:
> > > > > > > >> >> >>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> Hi Jeff,
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> Thanks for raising this thread and the design
> > > > > document!
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> As @Thomas Weise mentioned above, extending
> > config
> > > > to
> > > > > > > flink
> > > > > > > >> >> >>>>>>>>>> requires far more effort than it should be.
> > Another
> > > > > > > example
> > > > > > > >> >> >>>>>>>>>> is we achieve detach mode by introduce another
> > > > > execution
> > > > > > > >> >> >>>>>>>>>> environment which also hijack #execute method.
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> I agree with your idea that user would
> > configure all
> > > > > > > things
> > > > > > > >> >> >>>>>>>>>> and flink "just" respect it. On this topic I
> > think
> > > > the
> > > > > > > >> >> >> unusual
> > > > > > > >> >> >>>>>>>>>> control flow when CliFrontend handle "run"
> > command
> > > > is
> > > > > > the
> > > > > > > >> >> >>>> problem.
> > > > > > > >> >> >>>>>>>>>> It handles several configs, mainly about
> cluster
> > > > > > settings,
> > > > > > > >> >> >> and
> > > > > > > >> >> >>>>>>>>>> thus main method of user program is unaware of
> > them.
> > > > > > Also
> > > > > > > it
> > > > > > > >> >> >>>>>> compiles
> > > > > > > >> >> >>>>>>>>>> app to job graph by run the main method with a
> > > > > hijacked
> > > > > > > exec
> > > > > > > >> >> >>>> env,
> > > > > > > >> >> >>>>>>>>>> which constrain the main method further.
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> I'd like to write down a few of notes on
> > > > configs/args
> > > > > > pass
> > > > > > > >> >> >> and
> > > > > > > >> >> >>>>>>> respect,
> > > > > > > >> >> >>>>>>>>>> as well as decoupling job compilation and
> > > > submission.
> > > > > > > Share
> > > > > > > >> >> >> on
> > > > > > > >> >> >>>>> this
> > > > > > > >> >> >>>>>>>>>> thread later.
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> Best,
> > > > > > > >> >> >>>>>>>>>> tison.
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com>
> > 于2019年6月17日周一
> > > > > > > >> >> >> 下午7:29写道:
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>> Hi Jeff and Flavio,
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>> Thanks Jeff a lot for proposing the design
> > > > document.
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>> We are also working on refactoring
> > ClusterClient to
> > > > > > allow
> > > > > > > >> >> >>>>> flexible
> > > > > > > >> >> >>>>>>> and
> > > > > > > >> >> >>>>>>>>>>> efficient job management in our real-time
> > platform.
> > > > > > > >> >> >>>>>>>>>>> We would like to draft a document to share
> our
> > > > ideas
> > > > > > with
> > > > > > > >> >> >>> you.
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>> I think it's a good idea to have something
> like
> > > > > Apache
> > > > > > > Livy
> > > > > > > >> >> >>> for
> > > > > > > >> >> >>>>>>> Flink,
> > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>> the efforts discussed here will take a great
> > step
> > > > > > forward
> > > > > > > >> >> >> to
> > > > > > > >> >> >>>> it.
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>> Regards,
> > > > > > > >> >> >>>>>>>>>>> Xiaogang
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it>
> > > > > > 于2019年6月17日周一
> > > > > > > >> >> >>>>> 下午7:13写道:
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>> Is there any possibility to have something
> > like
> > > > > Apache
> > > > > > > >> >> >> Livy
> > > > > > > >> >> >>>> [1]
> > > > > > > >> >> >>>>>>> also
> > > > > > > >> >> >>>>>>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>> Flink in the future?
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>> [1] https://livy.apache.org/
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>> On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <
> > > > > > > >> >> >>> zjf...@gmail.com
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>>>> wrote:
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> Any API we expose should not have
> > dependencies
> > > > > on
> > > > > > > >> >> >>> the
> > > > > > > >> >> >>>>>>> runtime
> > > > > > > >> >> >>>>>>>>>>>>> (flink-runtime) package or other
> > implementation
> > > > > > > >> >> >> details.
> > > > > > > >> >> >>> To
> > > > > > > >> >> >>>>> me,
> > > > > > > >> >> >>>>>>>> this
> > > > > > > >> >> >>>>>>>>>>>> means
> > > > > > > >> >> >>>>>>>>>>>>> that the current ClusterClient cannot be
> > exposed
> > > > to
> > > > > > > >> >> >> users
> > > > > > > >> >> >>>>>> because
> > > > > > > >> >> >>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>> uses
> > > > > > > >> >> >>>>>>>>>>>>> quite some classes from the optimiser and
> > runtime
> > > > > > > >> >> >>> packages.
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>> We should change ClusterClient from class
> to
> > > > > > interface.
> > > > > > > >> >> >>>>>>>>>>>>> ExecutionEnvironment only use the interface
> > > > > > > >> >> >> ClusterClient
> > > > > > > >> >> >>>>> which
> > > > > > > >> >> >>>>>>>>>>> should be
> > > > > > > >> >> >>>>>>>>>>>>> in flink-clients while the concrete
> > > > implementation
> > > > > > > >> >> >> class
> > > > > > > >> >> >>>>> could
> > > > > > > >> >> >>>>>> be
> > > > > > > >> >> >>>>>>>> in
> > > > > > > >> >> >>>>>>>>>>>>> flink-runtime.
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> What happens when a failure/restart in
> the
> > > > > client
> > > > > > > >> >> >>>>> happens?
> > > > > > > >> >> >>>>>>>> There
> > > > > > > >> >> >>>>>>>>>>> need
> > > > > > > >> >> >>>>>>>>>>>>> to be a way of re-establishing the
> > connection to
> > > > > the
> > > > > > > >> >> >> job,
> > > > > > > >> >> >>>> set
> > > > > > > >> >> >>>>>> up
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> listeners again, etc.
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>> Good point.  First we need to define what
> > does
> > > > > > > >> >> >>>>> failure/restart
> > > > > > > >> >> >>>>>> in
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> client mean. IIUC, that usually mean
> network
> > > > > failure
> > > > > > > >> >> >>> which
> > > > > > > >> >> >>>>> will
> > > > > > > >> >> >>>>>>>>>>> happen in
> > > > > > > >> >> >>>>>>>>>>>>> class RestClient. If my understanding is
> > correct,
> > > > > > > >> >> >>>>> restart/retry
> > > > > > > >> >> >>>>>>>>>>> mechanism
> > > > > > > >> >> >>>>>>>>>>>>> should be done in RestClient.
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org>
> > > > > 于2019年6月11日周二
> > > > > > > >> >> >>>>>> 下午11:10写道:
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>> Some points to consider:
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>> * Any API we expose should not have
> > dependencies
> > > > > on
> > > > > > > >> >> >> the
> > > > > > > >> >> >>>>>> runtime
> > > > > > > >> >> >>>>>>>>>>>>>> (flink-runtime) package or other
> > implementation
> > > > > > > >> >> >>> details.
> > > > > > > >> >> >>>> To
> > > > > > > >> >> >>>>>> me,
> > > > > > > >> >> >>>>>>>>>>> this
> > > > > > > >> >> >>>>>>>>>>>>> means
> > > > > > > >> >> >>>>>>>>>>>>>> that the current ClusterClient cannot be
> > exposed
> > > > > to
> > > > > > > >> >> >>> users
> > > > > > > >> >> >>>>>>> because
> > > > > > > >> >> >>>>>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>>> uses
> > > > > > > >> >> >>>>>>>>>>>>>> quite some classes from the optimiser and
> > > > runtime
> > > > > > > >> >> >>>> packages.
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>> * What happens when a failure/restart in
> the
> > > > > client
> > > > > > > >> >> >>>>> happens?
> > > > > > > >> >> >>>>>>>> There
> > > > > > > >> >> >>>>>>>>>>> need
> > > > > > > >> >> >>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>> be a way of re-establishing the connection
> > to
> > > > the
> > > > > > > >> >> >> job,
> > > > > > > >> >> >>>> set
> > > > > > > >> >> >>>>> up
> > > > > > > >> >> >>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> listeners
> > > > > > > >> >> >>>>>>>>>>>>>> again, etc.
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>> Aljoscha
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>> On 29. May 2019, at 10:17, Jeff Zhang <
> > > > > > > >> >> >>>> zjf...@gmail.com>
> > > > > > > >> >> >>>>>>>> wrote:
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>> Sorry folks, the design doc is late as
> you
> > > > > > > >> >> >> expected.
> > > > > > > >> >> >>>>> Here's
> > > > > > > >> >> >>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>> design
> > > > > > > >> >> >>>>>>>>>>>>>> doc
> > > > > > > >> >> >>>>>>>>>>>>>>> I drafted, welcome any comments and
> > feedback.
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>> Stephan Ewen <se...@apache.org>
> > 于2019年2月14日周四
> > > > > > > >> >> >>>> 下午8:43写道:
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> Nice that this discussion is happening.
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> In the FLIP, we could also revisit the
> > entire
> > > > > role
> > > > > > > >> >> >>> of
> > > > > > > >> >> >>>>> the
> > > > > > > >> >> >>>>>>>>>>>> environments
> > > > > > > >> >> >>>>>>>>>>>>>>>> again.
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> Initially, the idea was:
> > > > > > > >> >> >>>>>>>>>>>>>>>> - the environments take care of the
> > specific
> > > > > > > >> >> >> setup
> > > > > > > >> >> >>>> for
> > > > > > > >> >> >>>>>>>>>>> standalone
> > > > > > > >> >> >>>>>>>>>>>> (no
> > > > > > > >> >> >>>>>>>>>>>>>>>> setup needed), yarn, mesos, etc.
> > > > > > > >> >> >>>>>>>>>>>>>>>> - the session ones have control over the
> > > > > session.
> > > > > > > >> >> >>> The
> > > > > > > >> >> >>>>>>>>>>> environment
> > > > > > > >> >> >>>>>>>>>>>>> holds
> > > > > > > >> >> >>>>>>>>>>>>>>>> the session client.
> > > > > > > >> >> >>>>>>>>>>>>>>>> - running a job gives a "control" object
> > for
> > > > > that
> > > > > > > >> >> >>>> job.
> > > > > > > >> >> >>>>>> That
> > > > > > > >> >> >>>>>>>>>>>> behavior
> > > > > > > >> >> >>>>>>>>>>>>> is
> > > > > > > >> >> >>>>>>>>>>>>>>>> the same in all environments.
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> The actual implementation diverged quite
> > a bit
> > > > > > > >> >> >> from
> > > > > > > >> >> >>>>> that.
> > > > > > > >> >> >>>>>>>> Happy
> > > > > > > >> >> >>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>> see a
> > > > > > > >> >> >>>>>>>>>>>>>>>> discussion about straitening this out a
> > bit
> > > > > more.
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>> On Tue, Feb 12, 2019 at 4:58 AM Jeff
> > Zhang <
> > > > > > > >> >> >>>>>>> zjf...@gmail.com>
> > > > > > > >> >> >>>>>>>>>>>> wrote:
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>> Hi folks,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>> Sorry for late response, It seems we
> > reach
> > > > > > > >> >> >>> consensus
> > > > > > > >> >> >>>> on
> > > > > > > >> >> >>>>>>>> this, I
> > > > > > > >> >> >>>>>>>>>>>> will
> > > > > > > >> >> >>>>>>>>>>>>>>>> create
> > > > > > > >> >> >>>>>>>>>>>>>>>>> FLIP for this with more detailed design
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>> Thomas Weise <t...@apache.org>
> > 于2018年12月21日周五
> > > > > > > >> >> >>>>> 上午11:43写道:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> Great to see this discussion seeded!
> The
> > > > > > > >> >> >> problems
> > > > > > > >> >> >>>> you
> > > > > > > >> >> >>>>>> face
> > > > > > > >> >> >>>>>>>>>>> with
> > > > > > > >> >> >>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> Zeppelin integration are also
> affecting
> > > > other
> > > > > > > >> >> >>>>> downstream
> > > > > > > >> >> >>>>>>>>>>> projects,
> > > > > > > >> >> >>>>>>>>>>>>>> like
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> Beam.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> We just enabled the savepoint restore
> > option
> > > > > in
> > > > > > > >> >> >>>>>>>>>>>>>> RemoteStreamEnvironment
> > > > > > > >> >> >>>>>>>>>>>>>>>>> [1]
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> and that was more difficult than it
> > should
> > > > be.
> > > > > > > >> >> >> The
> > > > > > > >> >> >>>>> main
> > > > > > > >> >> >>>>>>>> issue
> > > > > > > >> >> >>>>>>>>>>> is
> > > > > > > >> >> >>>>>>>>>>>>> that
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> environment and cluster client aren't
> > > > > decoupled.
> > > > > > > >> >> >>>>> Ideally
> > > > > > > >> >> >>>>>>> it
> > > > > > > >> >> >>>>>>>>>>> should
> > > > > > > >> >> >>>>>>>>>>>>> be
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> possible to just get the matching
> > cluster
> > > > > client
> > > > > > > >> >> >>>> from
> > > > > > > >> >> >>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> environment
> > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> then control the job through it
> > (environment
> > > > > as
> > > > > > > >> >> >>>>> factory
> > > > > > > >> >> >>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>> cluster
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> client). But note that the environment
> > > > classes
> > > > > > > >> >> >> are
> > > > > > > >> >> >>>>> part
> > > > > > > >> >> >>>>>> of
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> public
> > > > > > > >> >> >>>>>>>>>>>>>>>>> API,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> and it is not straightforward to make
> > larger
> > > > > > > >> >> >>> changes
> > > > > > > >> >> >>>>>>> without
> > > > > > > >> >> >>>>>>>>>>>>> breaking
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> backward compatibility.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterClient currently exposes
> internal
> > > > > classes
> > > > > > > >> >> >>>> like
> > > > > > > >> >> >>>>>>>>>>> JobGraph and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> StreamGraph. But it should be possible
> > to
> > > > wrap
> > > > > > > >> >> >>> this
> > > > > > > >> >> >>>>>> with a
> > > > > > > >> >> >>>>>>>> new
> > > > > > > >> >> >>>>>>>>>>>>> public
> > > > > > > >> >> >>>>>>>>>>>>>>>> API
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> that brings the required job control
> > > > > > > >> >> >> capabilities
> > > > > > > >> >> >>>> for
> > > > > > > >> >> >>>>>>>>>>> downstream
> > > > > > > >> >> >>>>>>>>>>>>>>>>> projects.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> Perhaps it is helpful to look at some
> > of the
> > > > > > > >> >> >>>>> interfaces
> > > > > > > >> >> >>>>>> in
> > > > > > > >> >> >>>>>>>>>>> Beam
> > > > > > > >> >> >>>>>>>>>>>>> while
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> thinking about this: [2] for the
> > portable
> > > > job
> > > > > > > >> >> >> API
> > > > > > > >> >> >>>> and
> > > > > > > >> >> >>>>>> [3]
> > > > > > > >> >> >>>>>>>> for
> > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> old
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> asynchronous job control from the Beam
> > Java
> > > > > SDK.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> The backward compatibility discussion
> > [4] is
> > > > > > > >> >> >> also
> > > > > > > >> >> >>>>>> relevant
> > > > > > > >> >> >>>>>>>>>>> here. A
> > > > > > > >> >> >>>>>>>>>>>>> new
> > > > > > > >> >> >>>>>>>>>>>>>>>>> API
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> should shield downstream projects from
> > > > > internals
> > > > > > > >> >> >>> and
> > > > > > > >> >> >>>>>> allow
> > > > > > > >> >> >>>>>>>>>>> them to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> interoperate with multiple future
> Flink
> > > > > versions
> > > > > > > >> >> >>> in
> > > > > > > >> >> >>>>> the
> > > > > > > >> >> >>>>>>> same
> > > > > > > >> >> >>>>>>>>>>>> release
> > > > > > > >> >> >>>>>>>>>>>>>>>> line
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> without forced upgrades.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thanks,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thomas
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> [1]
> > > > https://github.com/apache/flink/pull/7249
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> [2]
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> [3]
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> [4]
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff
> > Zhang <
> > > > > > > >> >> >>>>>>>> zjf...@gmail.com>
> > > > > > > >> >> >>>>>>>>>>>>> wrote:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'm not so sure whether the user
> > should
> > > > be
> > > > > > > >> >> >>> able
> > > > > > > >> >> >>>> to
> > > > > > > >> >> >>>>>>>> define
> > > > > > > >> >> >>>>>>>>>>>> where
> > > > > > > >> >> >>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> runs (in your example Yarn). This is
> > > > actually
> > > > > > > >> >> >>>>>> independent
> > > > > > > >> >> >>>>>>>> of
> > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> development and is something which is
> > > > decided
> > > > > > > >> >> >> at
> > > > > > > >> >> >>>>>>> deployment
> > > > > > > >> >> >>>>>>>>>>> time.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> User don't need to specify execution
> > mode
> > > > > > > >> >> >>>>>>> programmatically.
> > > > > > > >> >> >>>>>>>>>>> They
> > > > > > > >> >> >>>>>>>>>>>>> can
> > > > > > > >> >> >>>>>>>>>>>>>>>>> also
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass the execution mode from the
> > arguments
> > > > in
> > > > > > > >> >> >>> flink
> > > > > > > >> >> >>>>> run
> > > > > > > >> >> >>>>>>>>>>> command.
> > > > > > > >> >> >>>>>>>>>>>>> e.g.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m yarn-cluster ....
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m local ...
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m host:port ...
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Does this make sense to you ?
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> To me it makes sense that the
> > > > > > > >> >> >>>> ExecutionEnvironment
> > > > > > > >> >> >>>>>> is
> > > > > > > >> >> >>>>>>>> not
> > > > > > > >> >> >>>>>>>>>>>>>>>> directly
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> initialized by the user and instead
> > context
> > > > > > > >> >> >>>> sensitive
> > > > > > > >> >> >>>>>> how
> > > > > > > >> >> >>>>>>>> you
> > > > > > > >> >> >>>>>>>>>>>> want
> > > > > > > >> >> >>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> execute your job (Flink CLI vs. IDE,
> > for
> > > > > > > >> >> >>> example).
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Right, currently I notice Flink would
> > > > create
> > > > > > > >> >> >>>>> different
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ContextExecutionEnvironment based on
> > > > > different
> > > > > > > >> >> >>>>>> submission
> > > > > > > >> >> >>>>>>>>>>>> scenarios
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> (Flink
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Cli vs IDE). To me this is kind of
> hack
> > > > > > > >> >> >> approach,
> > > > > > > >> >> >>>> not
> > > > > > > >> >> >>>>>> so
> > > > > > > >> >> >>>>>>>>>>>>>>>>> straightforward.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> What I suggested above is that is
> that
> > > > flink
> > > > > > > >> >> >>> should
> > > > > > > >> >> >>>>>>> always
> > > > > > > >> >> >>>>>>>>>>> create
> > > > > > > >> >> >>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> same
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ExecutionEnvironment but with
> different
> > > > > > > >> >> >>>>> configuration,
> > > > > > > >> >> >>>>>>> and
> > > > > > > >> >> >>>>>>>>>>> based
> > > > > > > >> >> >>>>>>>>>>>> on
> > > > > > > >> >> >>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> configuration it would create the
> > proper
> > > > > > > >> >> >>>>> ClusterClient
> > > > > > > >> >> >>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>>> different
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> behaviors.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Till Rohrmann <trohrm...@apache.org>
> > > > > > > >> >> >>>> 于2018年12月20日周四
> > > > > > > >> >> >>>>>>>>>>> 下午11:18写道:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> You are probably right that we have
> > code
> > > > > > > >> >> >>>> duplication
> > > > > > > >> >> >>>>>>> when
> > > > > > > >> >> >>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>> comes
> > > > > > > >> >> >>>>>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creation of the ClusterClient. This
> > should
> > > > > be
> > > > > > > >> >> >>>>> reduced
> > > > > > > >> >> >>>>>> in
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>> future.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I'm not so sure whether the user
> > should be
> > > > > > > >> >> >> able
> > > > > > > >> >> >>> to
> > > > > > > >> >> >>>>>>> define
> > > > > > > >> >> >>>>>>>>>>> where
> > > > > > > >> >> >>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> runs (in your example Yarn). This is
> > > > > actually
> > > > > > > >> >> >>>>>>> independent
> > > > > > > >> >> >>>>>>>>>>> of the
> > > > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> development and is something which
> is
> > > > > decided
> > > > > > > >> >> >> at
> > > > > > > >> >> >>>>>>>> deployment
> > > > > > > >> >> >>>>>>>>>>>> time.
> > > > > > > >> >> >>>>>>>>>>>>>>>> To
> > > > > > > >> >> >>>>>>>>>>>>>>>>> me
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> makes sense that the
> > ExecutionEnvironment
> > > > is
> > > > > > > >> >> >> not
> > > > > > > >> >> >>>>>>> directly
> > > > > > > >> >> >>>>>>>>>>>>>>>> initialized
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> by
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> the user and instead context
> > sensitive how
> > > > > you
> > > > > > > >> >> >>>> want
> > > > > > > >> >> >>>>> to
> > > > > > > >> >> >>>>>>>>>>> execute
> > > > > > > >> >> >>>>>>>>>>>>> your
> > > > > > > >> >> >>>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> (Flink CLI vs. IDE, for example).
> > > > However, I
> > > > > > > >> >> >>> agree
> > > > > > > >> >> >>>>>> that
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ExecutionEnvironment should give you
> > > > access
> > > > > to
> > > > > > > >> >> >>> the
> > > > > > > >> >> >>>>>>>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job (maybe in the form of the
> > JobGraph or
> > > > a
> > > > > > > >> >> >> job
> > > > > > > >> >> >>>>> plan).
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Cheers,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Till
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff
> > > > Zhang <
> > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com>
> > > > > > > >> >> >>>>>>>>>>>>>>>> wrote:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi Till,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. You are
> > right
> > > > > that I
> > > > > > > >> >> >>>>> expect
> > > > > > > >> >> >>>>>>>> better
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job submission/control api which
> > could be
> > > > > > > >> >> >> used
> > > > > > > >> >> >>> by
> > > > > > > >> >> >>>>>>>>>>> downstream
> > > > > > > >> >> >>>>>>>>>>>>>>>>> project.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> And
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it would benefit for the flink
> > ecosystem.
> > > > > > > >> >> >> When
> > > > > > > >> >> >>> I
> > > > > > > >> >> >>>>> look
> > > > > > > >> >> >>>>>>> at
> > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>> code
> > > > > > > >> >> >>>>>>>>>>>>>>>>> of
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> flink
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> scala-shell and sql-client (I
> believe
> > > > they
> > > > > > > >> >> >> are
> > > > > > > >> >> >>>> not
> > > > > > > >> >> >>>>>> the
> > > > > > > >> >> >>>>>>>>>>> core of
> > > > > > > >> >> >>>>>>>>>>>>>>>>> flink,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> but
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> belong to the ecosystem of flink),
> I
> > find
> > > > > > > >> >> >> many
> > > > > > > >> >> >>>>>>> duplicated
> > > > > > > >> >> >>>>>>>>>>> code
> > > > > > > >> >> >>>>>>>>>>>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creating
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ClusterClient from user provided
> > > > > > > >> >> >> configuration
> > > > > > > >> >> >>>>>>>>>>> (configuration
> > > > > > > >> >> >>>>>>>>>>>>>>>>> format
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> may
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> different from scala-shell and
> > > > sql-client)
> > > > > > > >> >> >> and
> > > > > > > >> >> >>>> then
> > > > > > > >> >> >>>>>> use
> > > > > > > >> >> >>>>>>>>>>> that
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to manipulate jobs. I don't think
> > this is
> > > > > > > >> >> >>>>> convenient
> > > > > > > >> >> >>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>>>>>> downstream
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> projects. What I expect is that
> > > > downstream
> > > > > > > >> >> >>>> project
> > > > > > > >> >> >>>>>> only
> > > > > > > >> >> >>>>>>>>>>> needs
> > > > > > > >> >> >>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> provide
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> necessary configuration info (maybe
> > > > > > > >> >> >> introducing
> > > > > > > >> >> >>>>> class
> > > > > > > >> >> >>>>>>>>>>>> FlinkConf),
> > > > > > > >> >> >>>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> build ExecutionEnvironment based on
> > this
> > > > > > > >> >> >>>> FlinkConf,
> > > > > > > >> >> >>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will create
> the
> > > > proper
> > > > > > > >> >> >>>>>>>> ClusterClient.
> > > > > > > >> >> >>>>>>>>>>> It
> > > > > > > >> >> >>>>>>>>>>>> not
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> only
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> benefit for the downstream project
> > > > > > > >> >> >> development
> > > > > > > >> >> >>>> but
> > > > > > > >> >> >>>>>> also
> > > > > > > >> >> >>>>>>>> be
> > > > > > > >> >> >>>>>>>>>>>>>>>> helpful
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> their integration test with flink.
> > Here's
> > > > > one
> > > > > > > >> >> >>>>> sample
> > > > > > > >> >> >>>>>>> code
> > > > > > > >> >> >>>>>>>>>>>> snippet
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> that
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> I
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> expect.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val conf = new
> > FlinkConf().mode("yarn")
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val env = new
> > ExecutionEnvironment(conf)
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobId = env.submit(...)
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobStatus =
> > > > > > > >> >> >>>>>>>>>>> env.getClusterClient().queryJobStatus(jobId)
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > env.getClusterClient().cancelJob(jobId)
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> What do you think ?
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Till Rohrmann <
> trohrm...@apache.org>
> > > > > > > >> >> >>>>> 于2018年12月11日周二
> > > > > > > >> >> >>>>>>>>>>> 下午6:28写道:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi Jeff,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> what you are proposing is to
> > provide the
> > > > > > > >> >> >> user
> > > > > > > >> >> >>>> with
> > > > > > > >> >> >>>>>>>> better
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> control. There was actually an
> > effort to
> > > > > > > >> >> >>> achieve
> > > > > > > >> >> >>>>>> this
> > > > > > > >> >> >>>>>>>> but
> > > > > > > >> >> >>>>>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>>>>>> has
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> never
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> been
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> completed [1]. However, there are
> > some
> > > > > > > >> >> >>>> improvement
> > > > > > > >> >> >>>>>> in
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>> code
> > > > > > > >> >> >>>>>>>>>>>>>>>>> base
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> now.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Look for example at the
> > NewClusterClient
> > > > > > > >> >> >>>> interface
> > > > > > > >> >> >>>>>>> which
> > > > > > > >> >> >>>>>>>>>>>>>>>> offers a
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> non-blocking job submission. But I
> > agree
> > > > > > > >> >> >> that
> > > > > > > >> >> >>> we
> > > > > > > >> >> >>>>>> need
> > > > > > > >> >> >>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>> improve
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> in
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> this regard.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I would not be in favour if
> > exposing all
> > > > > > > >> >> >>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>> calls
> > > > > > > >> >> >>>>>>>>>>>>>>>> via
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment because it
> > would
> > > > > > > >> >> >> clutter
> > > > > > > >> >> >>>> the
> > > > > > > >> >> >>>>>>> class
> > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>> would
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> not
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> a
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> good separation of concerns.
> > Instead one
> > > > > > > >> >> >> idea
> > > > > > > >> >> >>>>> could
> > > > > > > >> >> >>>>>> be
> > > > > > > >> >> >>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>> retrieve
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> current ClusterClient from the
> > > > > > > >> >> >>>>> ExecutionEnvironment
> > > > > > > >> >> >>>>>>>> which
> > > > > > > >> >> >>>>>>>>>>> can
> > > > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> be
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> used
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> for cluster and job control. But
> > before
> > > > we
> > > > > > > >> >> >>> start
> > > > > > > >> >> >>>>> an
> > > > > > > >> >> >>>>>>>> effort
> > > > > > > >> >> >>>>>>>>>>>>>>>> here,
> > > > > > > >> >> >>>>>>>>>>>>>>>>> we
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> need
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> agree and capture what
> > functionality we
> > > > > want
> > > > > > > >> >> >>> to
> > > > > > > >> >> >>>>>>> provide.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Initially, the idea was that we
> > have the
> > > > > > > >> >> >>>>>>>> ClusterDescriptor
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> describing
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> how
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> to talk to cluster manager like
> > Yarn or
> > > > > > > >> >> >> Mesos.
> > > > > > > >> >> >>>> The
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterDescriptor
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> can
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> be
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> used for deploying Flink clusters
> > (job
> > > > and
> > > > > > > >> >> >>>>> session)
> > > > > > > >> >> >>>>>>> and
> > > > > > > >> >> >>>>>>>>>>> gives
> > > > > > > >> >> >>>>>>>>>>>>>>>>> you a
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ClusterClient. The ClusterClient
> > > > controls
> > > > > > > >> >> >> the
> > > > > > > >> >> >>>>>> cluster
> > > > > > > >> >> >>>>>>>>>>> (e.g.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> submitting
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> jobs, listing all running jobs).
> And
> > > > then
> > > > > > > >> >> >>> there
> > > > > > > >> >> >>>>> was
> > > > > > > >> >> >>>>>>> the
> > > > > > > >> >> >>>>>>>>>>> idea
> > > > > > > >> >> >>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> introduce a
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> JobClient which you obtain from
> the
> > > > > > > >> >> >>>> ClusterClient
> > > > > > > >> >> >>>>> to
> > > > > > > >> >> >>>>>>>>>>> trigger
> > > > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> specific
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> operations (e.g. taking a
> savepoint,
> > > > > > > >> >> >>> cancelling
> > > > > > > >> >> >>>>> the
> > > > > > > >> >> >>>>>>>> job).
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> [1]
> > > > > > > >> >> >>>>>> https://issues.apache.org/jira/browse/FLINK-4272
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Till
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM
> > Jeff
> > > > > Zhang
> > > > > > > >> >> >> <
> > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> wrote:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Folks,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I am trying to integrate flink
> into
> > > > > apache
> > > > > > > >> >> >>>>> zeppelin
> > > > > > > >> >> >>>>>>>>>>> which is
> > > > > > > >> >> >>>>>>>>>>>>>>>> an
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> interactive
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> notebook. And I hit several
> issues
> > that
> > > > > is
> > > > > > > >> >> >>>> caused
> > > > > > > >> >> >>>>>> by
> > > > > > > >> >> >>>>>>>>>>> flink
> > > > > > > >> >> >>>>>>>>>>>>>>>>> client
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> api.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> So
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I'd like to proposal the
> following
> > > > > changes
> > > > > > > >> >> >>> for
> > > > > > > >> >> >>>>>> flink
> > > > > > > >> >> >>>>>>>>>>> client
> > > > > > > >> >> >>>>>>>>>>>>>>>>> api.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1. Support nonblocking execution.
> > > > > > > >> >> >> Currently,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment#execute
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is a blocking method which would
> > do 2
> > > > > > > >> >> >> things,
> > > > > > > >> >> >>>>> first
> > > > > > > >> >> >>>>>>>>>>> submit
> > > > > > > >> >> >>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wait for job until it is
> finished.
> > I'd
> > > > > like
> > > > > > > >> >> >>>>>>> introduce a
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> nonblocking
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution method like
> > > > > > > >> >> >>>> ExecutionEnvironment#submit
> > > > > > > >> >> >>>>>>> which
> > > > > > > >> >> >>>>>>>>>>> only
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> submit
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> then return jobId to client. And
> > allow
> > > > > user
> > > > > > > >> >> >>> to
> > > > > > > >> >> >>>>>> query
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> status
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> via
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 2. Add cancel api in
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> currently the only way to cancel
> > job is
> > > > > via
> > > > > > > >> >> >>> cli
> > > > > > > >> >> >>>>>>>>>>> (bin/flink),
> > > > > > > >> >> >>>>>>>>>>>>>>>>> this
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> is
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> not
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> convenient for downstream project
> > to
> > > > use
> > > > > > > >> >> >> this
> > > > > > > >> >> >>>>>>> feature.
> > > > > > > >> >> >>>>>>>>>>> So I'd
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> like
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> add
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cancel api in
> ExecutionEnvironment
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 3. Add savepoint api in
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> It
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is similar as cancel api, we
> > should use
> > > > > > > >> >> >>>>>>>>>>> ExecutionEnvironment
> > > > > > > >> >> >>>>>>>>>>>>>>>> as
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> unified
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> api for third party to integrate
> > with
> > > > > > > >> >> >> flink.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 4. Add listener for job execution
> > > > > > > >> >> >> lifecycle.
> > > > > > > >> >> >>>>>>> Something
> > > > > > > >> >> >>>>>>>>>>> like
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> following,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> so
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that downstream project can do
> > custom
> > > > > logic
> > > > > > > >> >> >>> in
> > > > > > > >> >> >>>>> the
> > > > > > > >> >> >>>>>>>>>>> lifecycle
> > > > > > > >> >> >>>>>>>>>>>>>>>> of
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> job.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> e.g.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Zeppelin would capture the jobId
> > after
> > > > > job
> > > > > > > >> >> >> is
> > > > > > > >> >> >>>>>>> submitted
> > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> use
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId to cancel it later when
> > > > necessary.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> public interface JobListener {
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobSubmitted(JobID
> jobId);
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void
> > onJobExecuted(JobExecutionResult
> > > > > > > >> >> >>>>> jobResult);
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  void onJobCanceled(JobID jobId);
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> }
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 5. Enable session in
> > > > > ExecutionEnvironment.
> > > > > > > >> >> >>>>>> Currently
> > > > > > > >> >> >>>>>>> it
> > > > > > > >> >> >>>>>>>>>>> is
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> disabled,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> but
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> session is very convenient for
> > third
> > > > > party
> > > > > > > >> >> >> to
> > > > > > > >> >> >>>>>>>> submitting
> > > > > > > >> >> >>>>>>>>>>> jobs
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> continually.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I hope flink can enable it again.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6. Unify all flink client api
> into
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a long term issue which
> > needs
> > > > > more
> > > > > > > >> >> >>>>> careful
> > > > > > > >> >> >>>>>>>>>>> thinking
> > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> design.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Currently some of features of
> > flink is
> > > > > > > >> >> >>> exposed
> > > > > > > >> >> >>>> in
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > >> >> >>>>>> but
> > > > > > > >> >> >>>>>>>>>>> some are
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> exposed
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> in
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cli instead of api, like the
> > cancel and
> > > > > > > >> >> >>>>> savepoint I
> > > > > > > >> >> >>>>>>>>>>> mentioned
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> above.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> think the root cause is due to
> that
> > > > flink
> > > > > > > >> >> >>>> didn't
> > > > > > > >> >> >>>>>>> unify
> > > > > > > >> >> >>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> interaction
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> with
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink. Here I list 3 scenarios of
> > flink
> > > > > > > >> >> >>>> operation
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Local job execution.  Flink
> will
> > > > > create
> > > > > > > >> >> >>>>>>>>>>> LocalEnvironment
> > > > > > > >> >> >>>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> use
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this LocalEnvironment to create
> > > > > > > >> >> >>> LocalExecutor
> > > > > > > >> >> >>>>> for
> > > > > > > >> >> >>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> execution.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Remote job execution. Flink
> will
> > > > > create
> > > > > > > >> >> >>>>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>>>>>>>> first
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  create ContextEnvironment based
> > on the
> > > > > > > >> >> >>>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> run
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> job.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  - Job cancelation. Flink will
> > create
> > > > > > > >> >> >>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>> first
> > > > > > > >> >> >>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> then
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> cancel
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>  this job via this ClusterClient.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> As you can see in the above 3
> > > > scenarios.
> > > > > > > >> >> >>> Flink
> > > > > > > >> >> >>>>>> didn't
> > > > > > > >> >> >>>>>>>>>>> use the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> same
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> approach(code path) to interact
> > with
> > > > > flink
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> What I propose is following:
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Create the proper
> > > > > > > >> >> >>>>>> LocalEnvironment/RemoteEnvironment
> > > > > > > >> >> >>>>>>>>>>> (based
> > > > > > > >> >> >>>>>>>>>>>>>>>> on
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> user
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration) --> Use this
> > Environment
> > > > > to
> > > > > > > >> >> >>>> create
> > > > > > > >> >> >>>>>>>> proper
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (LocalClusterClient or
> > > > RestClusterClient)
> > > > > > > >> >> >> to
> > > > > > > >> >> >>>>>>>> interactive
> > > > > > > >> >> >>>>>>>>>>> with
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink (
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution or cancelation)
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This way we can unify the process
> > of
> > > > > local
> > > > > > > >> >> >>>>>> execution
> > > > > > > >> >> >>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>> remote
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> execution.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> And it is much easier for third
> > party
> > > > to
> > > > > > > >> >> >>>>> integrate
> > > > > > > >> >> >>>>>>> with
> > > > > > > >> >> >>>>>>>>>>>>>>>> flink,
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> because
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment is the
> unified
> > > > entry
> > > > > > > >> >> >>> point
> > > > > > > >> >> >>>>> for
> > > > > > > >> >> >>>>>>>>>>> flink.
> > > > > > > >> >> >>>>>>>>>>>>>>>> What
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> third
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> party
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> needs to do is just pass
> > configuration
> > > > to
> > > > > > > >> >> >>>>>>>>>>>>>>>> ExecutionEnvironment
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> and
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will do the
> > right
> > > > > > > >> >> >> thing
> > > > > > > >> >> >>>>> based
> > > > > > > >> >> >>>>>> on
> > > > > > > >> >> >>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> configuration.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Flink cli can also be considered
> as
> > > > flink
> > > > > > > >> >> >> api
> > > > > > > >> >> >>>>>>> consumer.
> > > > > > > >> >> >>>>>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>>>>>> just
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration to
> > ExecutionEnvironment
> > > > and
> > > > > > > >> >> >> let
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> ExecutionEnvironment
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> create the proper ClusterClient
> > instead
> > > > > of
> > > > > > > >> >> >>>>> letting
> > > > > > > >> >> >>>>>>> cli
> > > > > > > >> >> >>>>>>>> to
> > > > > > > >> >> >>>>>>>>>>>>>>>>> create
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ClusterClient directly.
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6 would involve large code
> > refactoring,
> > > > > so
> > > > > > > >> >> >> I
> > > > > > > >> >> >>>>> think
> > > > > > > >> >> >>>>>> we
> > > > > > > >> >> >>>>>>>> can
> > > > > > > >> >> >>>>>>>>>>>>>>>> defer
> > > > > > > >> >> >>>>>>>>>>>>>>>>>> it
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> for
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> future release, 1,2,3,4,5 could
> be
> > done
> > > > > at
> > > > > > > >> >> >>>> once I
> > > > > > > >> >> >>>>>>>>>>> believe.
> > > > > > > >> >> >>>>>>>>>>>>>>>> Let
> > > > > > > >> >> >>>>>>>>>>>>>>>>> me
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> know
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> your
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> comments and feedback, thanks
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> --
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> --
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> --
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>> --
> > > > > > > >> >> >>>>>>>>>>>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>> --
> > > > > > > >> >> >>>>>>>>>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>> --
> > > > > > > >> >> >>>>>>>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>>
> > > > > > > >> >> >>>>>>>>>>
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>> --
> > > > > > > >> >> >>>>>>>> Best Regards
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>> Jeff Zhang
> > > > > > > >> >> >>>>>>>>
> > > > > > > >> >> >>>>>>>
> > > > > > > >> >> >>>>>>
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>> --
> > > > > > > >> >> >>>>> Best Regards
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>> Jeff Zhang
> > > > > > > >> >> >>>>>
> > > > > > > >> >> >>>>
> > > > > > > >> >> >>>
> > > > > > > >> >> >>
> > > > > > > >> >> >
> > > > > > > >> >> >
> > > > > > > >> >> > --
> > > > > > > >> >> > Best Regards
> > > > > > > >> >> >
> > > > > > > >> >> > Jeff Zhang
> > > > > > > >> >>
> > > > > > > >> >>
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> >
>

Reply via email to