Hi folks,

Sorry for late response, It seems we reach consensus on this, I will create
FLIP for this with more detailed design


Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:

> Great to see this discussion seeded! The problems you face with the
> Zeppelin integration are also affecting other downstream projects, like
> Beam.
>
> We just enabled the savepoint restore option in RemoteStreamEnvironment [1]
> and that was more difficult than it should be. The main issue is that
> environment and cluster client aren't decoupled. Ideally it should be
> possible to just get the matching cluster client from the environment and
> then control the job through it (environment as factory for cluster
> client). But note that the environment classes are part of the public API,
> and it is not straightforward to make larger changes without breaking
> backward compatibility.
>
> ClusterClient currently exposes internal classes like JobGraph and
> StreamGraph. But it should be possible to wrap this with a new public API
> that brings the required job control capabilities for downstream projects.
> Perhaps it is helpful to look at some of the interfaces in Beam while
> thinking about this: [2] for the portable job API and [3] for the old
> asynchronous job control from the Beam Java SDK.
>
> The backward compatibility discussion [4] is also relevant here. A new API
> should shield downstream projects from internals and allow them to
> interoperate with multiple future Flink versions in the same release line
> without forced upgrades.
>
> Thanks,
> Thomas
>
> [1] https://github.com/apache/flink/pull/7249
> [2]
>
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> [3]
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> [4]
>
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
>
>
> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote:
>
> > >>> I'm not so sure whether the user should be able to define where the
> job
> > runs (in your example Yarn). This is actually independent of the job
> > development and is something which is decided at deployment time.
> >
> > User don't need to specify execution mode programmatically. They can also
> > pass the execution mode from the arguments in flink run command. e.g.
> >
> > bin/flink run -m yarn-cluster ....
> > bin/flink run -m local ...
> > bin/flink run -m host:port ...
> >
> > Does this make sense to you ?
> >
> > >>> To me it makes sense that the ExecutionEnvironment is not directly
> > initialized by the user and instead context sensitive how you want to
> > execute your job (Flink CLI vs. IDE, for example).
> >
> > Right, currently I notice Flink would create different
> > ContextExecutionEnvironment based on different submission scenarios
> (Flink
> > Cli vs IDE). To me this is kind of hack approach, not so straightforward.
> > What I suggested above is that is that flink should always create the
> same
> > ExecutionEnvironment but with different configuration, and based on the
> > configuration it would create the proper ClusterClient for different
> > behaviors.
> >
> >
> >
> >
> >
> >
> >
> > Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
> >
> > > You are probably right that we have code duplication when it comes to
> the
> > > creation of the ClusterClient. This should be reduced in the future.
> > >
> > > I'm not so sure whether the user should be able to define where the job
> > > runs (in your example Yarn). This is actually independent of the job
> > > development and is something which is decided at deployment time. To me
> > it
> > > makes sense that the ExecutionEnvironment is not directly initialized
> by
> > > the user and instead context sensitive how you want to execute your job
> > > (Flink CLI vs. IDE, for example). However, I agree that the
> > > ExecutionEnvironment should give you access to the ClusterClient and to
> > the
> > > job (maybe in the form of the JobGraph or a job plan).
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> wrote:
> > >
> > > > Hi Till,
> > > > Thanks for the feedback. You are right that I expect better
> > programmatic
> > > > job submission/control api which could be used by downstream project.
> > And
> > > > it would benefit for the flink ecosystem. When I look at the code of
> > > flink
> > > > scala-shell and sql-client (I believe they are not the core of flink,
> > but
> > > > belong to the ecosystem of flink), I find many duplicated code for
> > > creating
> > > > ClusterClient from user provided configuration (configuration format
> > may
> > > be
> > > > different from scala-shell and sql-client) and then use that
> > > ClusterClient
> > > > to manipulate jobs. I don't think this is convenient for downstream
> > > > projects. What I expect is that downstream project only needs to
> > provide
> > > > necessary configuration info (maybe introducing class FlinkConf), and
> > > then
> > > > build ExecutionEnvironment based on this FlinkConf, and
> > > > ExecutionEnvironment will create the proper ClusterClient. It not
> only
> > > > benefit for the downstream project development but also be helpful
> for
> > > > their integration test with flink. Here's one sample code snippet
> that
> > I
> > > > expect.
> > > >
> > > > val conf = new FlinkConf().mode("yarn")
> > > > val env = new ExecutionEnvironment(conf)
> > > > val jobId = env.submit(...)
> > > > val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > > > env.getClusterClient().cancelJob(jobId)
> > > >
> > > > What do you think ?
> > > >
> > > >
> > > >
> > > >
> > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> > > >
> > > > > Hi Jeff,
> > > > >
> > > > > what you are proposing is to provide the user with better
> > programmatic
> > > > job
> > > > > control. There was actually an effort to achieve this but it has
> > never
> > > > been
> > > > > completed [1]. However, there are some improvement in the code base
> > > now.
> > > > > Look for example at the NewClusterClient interface which offers a
> > > > > non-blocking job submission. But I agree that we need to improve
> > Flink
> > > in
> > > > > this regard.
> > > > >
> > > > > I would not be in favour if exposing all ClusterClient calls via
> the
> > > > > ExecutionEnvironment because it would clutter the class and would
> not
> > > be
> > > > a
> > > > > good separation of concerns. Instead one idea could be to retrieve
> > the
> > > > > current ClusterClient from the ExecutionEnvironment which can then
> be
> > > > used
> > > > > for cluster and job control. But before we start an effort here, we
> > > need
> > > > to
> > > > > agree and capture what functionality we want to provide.
> > > > >
> > > > > Initially, the idea was that we have the ClusterDescriptor
> describing
> > > how
> > > > > to talk to cluster manager like Yarn or Mesos. The
> ClusterDescriptor
> > > can
> > > > be
> > > > > used for deploying Flink clusters (job and session) and gives you a
> > > > > ClusterClient. The ClusterClient controls the cluster (e.g.
> > submitting
> > > > > jobs, listing all running jobs). And then there was the idea to
> > > > introduce a
> > > > > JobClient which you obtain from the ClusterClient to trigger job
> > > specific
> > > > > operations (e.g. taking a savepoint, cancelling the job).
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-4272
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Folks,
> > > > > >
> > > > > > I am trying to integrate flink into apache zeppelin which is an
> > > > > interactive
> > > > > > notebook. And I hit several issues that is caused by flink client
> > > api.
> > > > So
> > > > > > I'd like to proposal the following changes for flink client api.
> > > > > >
> > > > > > 1. Support nonblocking execution. Currently,
> > > > ExecutionEnvironment#execute
> > > > > > is a blocking method which would do 2 things, first submit job
> and
> > > then
> > > > > > wait for job until it is finished. I'd like introduce a
> nonblocking
> > > > > > execution method like ExecutionEnvironment#submit which only
> submit
> > > job
> > > > > and
> > > > > > then return jobId to client. And allow user to query the job
> status
> > > via
> > > > > the
> > > > > > jobId.
> > > > > >
> > > > > > 2. Add cancel api in
> > ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > currently the only way to cancel job is via cli (bin/flink), this
> > is
> > > > not
> > > > > > convenient for downstream project to use this feature. So I'd
> like
> > to
> > > > add
> > > > > > cancel api in ExecutionEnvironment
> > > > > >
> > > > > > 3. Add savepoint api in
> > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > It
> > > > > > is similar as cancel api, we should use ExecutionEnvironment as
> the
> > > > > unified
> > > > > > api for third party to integrate with flink.
> > > > > >
> > > > > > 4. Add listener for job execution lifecycle. Something like
> > > following,
> > > > so
> > > > > > that downstream project can do custom logic in the lifecycle of
> > job.
> > > > e.g.
> > > > > > Zeppelin would capture the jobId after job is submitted and then
> > use
> > > > this
> > > > > > jobId to cancel it later when necessary.
> > > > > >
> > > > > > public interface JobListener {
> > > > > >
> > > > > >    void onJobSubmitted(JobID jobId);
> > > > > >
> > > > > >    void onJobExecuted(JobExecutionResult jobResult);
> > > > > >
> > > > > >    void onJobCanceled(JobID jobId);
> > > > > > }
> > > > > >
> > > > > > 5. Enable session in ExecutionEnvironment. Currently it is
> > disabled,
> > > > but
> > > > > > session is very convenient for third party to submitting jobs
> > > > > continually.
> > > > > > I hope flink can enable it again.
> > > > > >
> > > > > > 6. Unify all flink client api into
> > > > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > >
> > > > > > This is a long term issue which needs more careful thinking and
> > > design.
> > > > > > Currently some of features of flink is exposed in
> > > > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are
> > exposed
> > > > in
> > > > > > cli instead of api, like the cancel and savepoint I mentioned
> > above.
> > > I
> > > > > > think the root cause is due to that flink didn't unify the
> > > interaction
> > > > > with
> > > > > > flink. Here I list 3 scenarios of flink operation
> > > > > >
> > > > > >    - Local job execution.  Flink will create LocalEnvironment and
> > > then
> > > > > use
> > > > > >    this LocalEnvironment to create LocalExecutor for job
> execution.
> > > > > >    - Remote job execution. Flink will create ClusterClient first
> > and
> > > > then
> > > > > >    create ContextEnvironment based on the ClusterClient and then
> > run
> > > > the
> > > > > > job.
> > > > > >    - Job cancelation. Flink will create ClusterClient first and
> > then
> > > > > cancel
> > > > > >    this job via this ClusterClient.
> > > > > >
> > > > > > As you can see in the above 3 scenarios. Flink didn't use the
> same
> > > > > > approach(code path) to interact with flink
> > > > > > What I propose is following:
> > > > > > Create the proper LocalEnvironment/RemoteEnvironment (based on
> user
> > > > > > configuration) --> Use this Environment to create proper
> > > ClusterClient
> > > > > > (LocalClusterClient or RestClusterClient) to interactive with
> > Flink (
> > > > job
> > > > > > execution or cancelation)
> > > > > >
> > > > > > This way we can unify the process of local execution and remote
> > > > > execution.
> > > > > > And it is much easier for third party to integrate with flink,
> > > because
> > > > > > ExecutionEnvironment is the unified entry point for flink. What
> > third
> > > > > party
> > > > > > needs to do is just pass configuration to ExecutionEnvironment
> and
> > > > > > ExecutionEnvironment will do the right thing based on the
> > > > configuration.
> > > > > > Flink cli can also be considered as flink api consumer. it just
> > pass
> > > > the
> > > > > > configuration to ExecutionEnvironment and let
> ExecutionEnvironment
> > to
> > > > > > create the proper ClusterClient instead of letting cli to create
> > > > > > ClusterClient directly.
> > > > > >
> > > > > >
> > > > > > 6 would involve large code refactoring, so I think we can defer
> it
> > > for
> > > > > > future release, 1,2,3,4,5 could be done at once I believe. Let me
> > > know
> > > > > your
> > > > > > comments and feedback, thanks
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best Regards
> > > > > >
> > > > > > Jeff Zhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


-- 
Best Regards

Jeff Zhang

Reply via email to