>>>  Any API we expose should not have dependencies on the runtime
(flink-runtime) package or other implementation details. To me, this means
that the current ClusterClient cannot be exposed to users because it   uses
quite some classes from the optimiser and runtime packages.

We should change ClusterClient from class to interface.
ExecutionEnvironment only use the interface ClusterClient which should be
in flink-clients while the concrete implementation class could be in
flink-runtime.

>>> What happens when a failure/restart in the client happens? There need
to be a way of re-establishing the connection to the job, set up the
listeners again, etc.

Good point.  First we need to define what does failure/restart in the
client mean. IIUC, that usually mean network failure which will happen in
class RestClient. If my understanding is correct, restart/retry mechanism
should be done in RestClient.





Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 下午11:10写道:

> Some points to consider:
>
> * Any API we expose should not have dependencies on the runtime
> (flink-runtime) package or other implementation details. To me, this means
> that the current ClusterClient cannot be exposed to users because it   uses
> quite some classes from the optimiser and runtime packages.
>
> * What happens when a failure/restart in the client happens? There need to
> be a way of re-establishing the connection to the job, set up the listeners
> again, etc.
>
> Aljoscha
>
> > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote:
> >
> > Sorry folks, the design doc is late as you expected. Here's the design
> doc
> > I drafted, welcome any comments and feedback.
> >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> >
> >
> >
> > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道:
> >
> >> Nice that this discussion is happening.
> >>
> >> In the FLIP, we could also revisit the entire role of the environments
> >> again.
> >>
> >> Initially, the idea was:
> >>  - the environments take care of the specific setup for standalone (no
> >> setup needed), yarn, mesos, etc.
> >>  - the session ones have control over the session. The environment holds
> >> the session client.
> >>  - running a job gives a "control" object for that job. That behavior is
> >> the same in all environments.
> >>
> >> The actual implementation diverged quite a bit from that. Happy to see a
> >> discussion about straitening this out a bit more.
> >>
> >>
> >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote:
> >>
> >>> Hi folks,
> >>>
> >>> Sorry for late response, It seems we reach consensus on this, I will
> >> create
> >>> FLIP for this with more detailed design
> >>>
> >>>
> >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
> >>>
> >>>> Great to see this discussion seeded! The problems you face with the
> >>>> Zeppelin integration are also affecting other downstream projects,
> like
> >>>> Beam.
> >>>>
> >>>> We just enabled the savepoint restore option in
> RemoteStreamEnvironment
> >>> [1]
> >>>> and that was more difficult than it should be. The main issue is that
> >>>> environment and cluster client aren't decoupled. Ideally it should be
> >>>> possible to just get the matching cluster client from the environment
> >> and
> >>>> then control the job through it (environment as factory for cluster
> >>>> client). But note that the environment classes are part of the public
> >>> API,
> >>>> and it is not straightforward to make larger changes without breaking
> >>>> backward compatibility.
> >>>>
> >>>> ClusterClient currently exposes internal classes like JobGraph and
> >>>> StreamGraph. But it should be possible to wrap this with a new public
> >> API
> >>>> that brings the required job control capabilities for downstream
> >>> projects.
> >>>> Perhaps it is helpful to look at some of the interfaces in Beam while
> >>>> thinking about this: [2] for the portable job API and [3] for the old
> >>>> asynchronous job control from the Beam Java SDK.
> >>>>
> >>>> The backward compatibility discussion [4] is also relevant here. A new
> >>> API
> >>>> should shield downstream projects from internals and allow them to
> >>>> interoperate with multiple future Flink versions in the same release
> >> line
> >>>> without forced upgrades.
> >>>>
> >>>> Thanks,
> >>>> Thomas
> >>>>
> >>>> [1] https://github.com/apache/flink/pull/7249
> >>>> [2]
> >>>>
> >>>>
> >>>
> >>
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> >>>> [3]
> >>>>
> >>>>
> >>>
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> >>>> [4]
> >>>>
> >>>>
> >>>
> >>
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> >>>>
> >>>>
> >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote:
> >>>>
> >>>>>>>> I'm not so sure whether the user should be able to define where
> >> the
> >>>> job
> >>>>> runs (in your example Yarn). This is actually independent of the job
> >>>>> development and is something which is decided at deployment time.
> >>>>>
> >>>>> User don't need to specify execution mode programmatically. They can
> >>> also
> >>>>> pass the execution mode from the arguments in flink run command. e.g.
> >>>>>
> >>>>> bin/flink run -m yarn-cluster ....
> >>>>> bin/flink run -m local ...
> >>>>> bin/flink run -m host:port ...
> >>>>>
> >>>>> Does this make sense to you ?
> >>>>>
> >>>>>>>> To me it makes sense that the ExecutionEnvironment is not
> >> directly
> >>>>> initialized by the user and instead context sensitive how you want to
> >>>>> execute your job (Flink CLI vs. IDE, for example).
> >>>>>
> >>>>> Right, currently I notice Flink would create different
> >>>>> ContextExecutionEnvironment based on different submission scenarios
> >>>> (Flink
> >>>>> Cli vs IDE). To me this is kind of hack approach, not so
> >>> straightforward.
> >>>>> What I suggested above is that is that flink should always create the
> >>>> same
> >>>>> ExecutionEnvironment but with different configuration, and based on
> >> the
> >>>>> configuration it would create the proper ClusterClient for different
> >>>>> behaviors.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
> >>>>>
> >>>>>> You are probably right that we have code duplication when it comes
> >> to
> >>>> the
> >>>>>> creation of the ClusterClient. This should be reduced in the
> >> future.
> >>>>>>
> >>>>>> I'm not so sure whether the user should be able to define where the
> >>> job
> >>>>>> runs (in your example Yarn). This is actually independent of the
> >> job
> >>>>>> development and is something which is decided at deployment time.
> >> To
> >>> me
> >>>>> it
> >>>>>> makes sense that the ExecutionEnvironment is not directly
> >> initialized
> >>>> by
> >>>>>> the user and instead context sensitive how you want to execute your
> >>> job
> >>>>>> (Flink CLI vs. IDE, for example). However, I agree that the
> >>>>>> ExecutionEnvironment should give you access to the ClusterClient
> >> and
> >>> to
> >>>>> the
> >>>>>> job (maybe in the form of the JobGraph or a job plan).
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Till
> >>>>>>
> >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Hi Till,
> >>>>>>> Thanks for the feedback. You are right that I expect better
> >>>>> programmatic
> >>>>>>> job submission/control api which could be used by downstream
> >>> project.
> >>>>> And
> >>>>>>> it would benefit for the flink ecosystem. When I look at the code
> >>> of
> >>>>>> flink
> >>>>>>> scala-shell and sql-client (I believe they are not the core of
> >>> flink,
> >>>>> but
> >>>>>>> belong to the ecosystem of flink), I find many duplicated code
> >> for
> >>>>>> creating
> >>>>>>> ClusterClient from user provided configuration (configuration
> >>> format
> >>>>> may
> >>>>>> be
> >>>>>>> different from scala-shell and sql-client) and then use that
> >>>>>> ClusterClient
> >>>>>>> to manipulate jobs. I don't think this is convenient for
> >> downstream
> >>>>>>> projects. What I expect is that downstream project only needs to
> >>>>> provide
> >>>>>>> necessary configuration info (maybe introducing class FlinkConf),
> >>> and
> >>>>>> then
> >>>>>>> build ExecutionEnvironment based on this FlinkConf, and
> >>>>>>> ExecutionEnvironment will create the proper ClusterClient. It not
> >>>> only
> >>>>>>> benefit for the downstream project development but also be
> >> helpful
> >>>> for
> >>>>>>> their integration test with flink. Here's one sample code snippet
> >>>> that
> >>>>> I
> >>>>>>> expect.
> >>>>>>>
> >>>>>>> val conf = new FlinkConf().mode("yarn")
> >>>>>>> val env = new ExecutionEnvironment(conf)
> >>>>>>> val jobId = env.submit(...)
> >>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> >>>>>>> env.getClusterClient().cancelJob(jobId)
> >>>>>>>
> >>>>>>> What do you think ?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> >>>>>>>
> >>>>>>>> Hi Jeff,
> >>>>>>>>
> >>>>>>>> what you are proposing is to provide the user with better
> >>>>> programmatic
> >>>>>>> job
> >>>>>>>> control. There was actually an effort to achieve this but it
> >> has
> >>>>> never
> >>>>>>> been
> >>>>>>>> completed [1]. However, there are some improvement in the code
> >>> base
> >>>>>> now.
> >>>>>>>> Look for example at the NewClusterClient interface which
> >> offers a
> >>>>>>>> non-blocking job submission. But I agree that we need to
> >> improve
> >>>>> Flink
> >>>>>> in
> >>>>>>>> this regard.
> >>>>>>>>
> >>>>>>>> I would not be in favour if exposing all ClusterClient calls
> >> via
> >>>> the
> >>>>>>>> ExecutionEnvironment because it would clutter the class and
> >> would
> >>>> not
> >>>>>> be
> >>>>>>> a
> >>>>>>>> good separation of concerns. Instead one idea could be to
> >>> retrieve
> >>>>> the
> >>>>>>>> current ClusterClient from the ExecutionEnvironment which can
> >>> then
> >>>> be
> >>>>>>> used
> >>>>>>>> for cluster and job control. But before we start an effort
> >> here,
> >>> we
> >>>>>> need
> >>>>>>> to
> >>>>>>>> agree and capture what functionality we want to provide.
> >>>>>>>>
> >>>>>>>> Initially, the idea was that we have the ClusterDescriptor
> >>>> describing
> >>>>>> how
> >>>>>>>> to talk to cluster manager like Yarn or Mesos. The
> >>>> ClusterDescriptor
> >>>>>> can
> >>>>>>> be
> >>>>>>>> used for deploying Flink clusters (job and session) and gives
> >>> you a
> >>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g.
> >>>>> submitting
> >>>>>>>> jobs, listing all running jobs). And then there was the idea to
> >>>>>>> introduce a
> >>>>>>>> JobClient which you obtain from the ClusterClient to trigger
> >> job
> >>>>>> specific
> >>>>>>>> operations (e.g. taking a savepoint, cancelling the job).
> >>>>>>>>
> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Till
> >>>>>>>>
> >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Folks,
> >>>>>>>>>
> >>>>>>>>> I am trying to integrate flink into apache zeppelin which is
> >> an
> >>>>>>>> interactive
> >>>>>>>>> notebook. And I hit several issues that is caused by flink
> >>> client
> >>>>>> api.
> >>>>>>> So
> >>>>>>>>> I'd like to proposal the following changes for flink client
> >>> api.
> >>>>>>>>>
> >>>>>>>>> 1. Support nonblocking execution. Currently,
> >>>>>>> ExecutionEnvironment#execute
> >>>>>>>>> is a blocking method which would do 2 things, first submit
> >> job
> >>>> and
> >>>>>> then
> >>>>>>>>> wait for job until it is finished. I'd like introduce a
> >>>> nonblocking
> >>>>>>>>> execution method like ExecutionEnvironment#submit which only
> >>>> submit
> >>>>>> job
> >>>>>>>> and
> >>>>>>>>> then return jobId to client. And allow user to query the job
> >>>> status
> >>>>>> via
> >>>>>>>> the
> >>>>>>>>> jobId.
> >>>>>>>>>
> >>>>>>>>> 2. Add cancel api in
> >>>>> ExecutionEnvironment/StreamExecutionEnvironment,
> >>>>>>>>> currently the only way to cancel job is via cli (bin/flink),
> >>> this
> >>>>> is
> >>>>>>> not
> >>>>>>>>> convenient for downstream project to use this feature. So I'd
> >>>> like
> >>>>> to
> >>>>>>> add
> >>>>>>>>> cancel api in ExecutionEnvironment
> >>>>>>>>>
> >>>>>>>>> 3. Add savepoint api in
> >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> >>>>>>>> It
> >>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment
> >> as
> >>>> the
> >>>>>>>> unified
> >>>>>>>>> api for third party to integrate with flink.
> >>>>>>>>>
> >>>>>>>>> 4. Add listener for job execution lifecycle. Something like
> >>>>>> following,
> >>>>>>> so
> >>>>>>>>> that downstream project can do custom logic in the lifecycle
> >> of
> >>>>> job.
> >>>>>>> e.g.
> >>>>>>>>> Zeppelin would capture the jobId after job is submitted and
> >>> then
> >>>>> use
> >>>>>>> this
> >>>>>>>>> jobId to cancel it later when necessary.
> >>>>>>>>>
> >>>>>>>>> public interface JobListener {
> >>>>>>>>>
> >>>>>>>>>   void onJobSubmitted(JobID jobId);
> >>>>>>>>>
> >>>>>>>>>   void onJobExecuted(JobExecutionResult jobResult);
> >>>>>>>>>
> >>>>>>>>>   void onJobCanceled(JobID jobId);
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is
> >>>>> disabled,
> >>>>>>> but
> >>>>>>>>> session is very convenient for third party to submitting jobs
> >>>>>>>> continually.
> >>>>>>>>> I hope flink can enable it again.
> >>>>>>>>>
> >>>>>>>>> 6. Unify all flink client api into
> >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> >>>>>>>>>
> >>>>>>>>> This is a long term issue which needs more careful thinking
> >> and
> >>>>>> design.
> >>>>>>>>> Currently some of features of flink is exposed in
> >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are
> >>>>> exposed
> >>>>>>> in
> >>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned
> >>>>> above.
> >>>>>> I
> >>>>>>>>> think the root cause is due to that flink didn't unify the
> >>>>>> interaction
> >>>>>>>> with
> >>>>>>>>> flink. Here I list 3 scenarios of flink operation
> >>>>>>>>>
> >>>>>>>>>   - Local job execution.  Flink will create LocalEnvironment
> >>> and
> >>>>>> then
> >>>>>>>> use
> >>>>>>>>>   this LocalEnvironment to create LocalExecutor for job
> >>>> execution.
> >>>>>>>>>   - Remote job execution. Flink will create ClusterClient
> >>> first
> >>>>> and
> >>>>>>> then
> >>>>>>>>>   create ContextEnvironment based on the ClusterClient and
> >>> then
> >>>>> run
> >>>>>>> the
> >>>>>>>>> job.
> >>>>>>>>>   - Job cancelation. Flink will create ClusterClient first
> >> and
> >>>>> then
> >>>>>>>> cancel
> >>>>>>>>>   this job via this ClusterClient.
> >>>>>>>>>
> >>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the
> >>>> same
> >>>>>>>>> approach(code path) to interact with flink
> >>>>>>>>> What I propose is following:
> >>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based
> >> on
> >>>> user
> >>>>>>>>> configuration) --> Use this Environment to create proper
> >>>>>> ClusterClient
> >>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with
> >>>>> Flink (
> >>>>>>> job
> >>>>>>>>> execution or cancelation)
> >>>>>>>>>
> >>>>>>>>> This way we can unify the process of local execution and
> >> remote
> >>>>>>>> execution.
> >>>>>>>>> And it is much easier for third party to integrate with
> >> flink,
> >>>>>> because
> >>>>>>>>> ExecutionEnvironment is the unified entry point for flink.
> >> What
> >>>>> third
> >>>>>>>> party
> >>>>>>>>> needs to do is just pass configuration to
> >> ExecutionEnvironment
> >>>> and
> >>>>>>>>> ExecutionEnvironment will do the right thing based on the
> >>>>>>> configuration.
> >>>>>>>>> Flink cli can also be considered as flink api consumer. it
> >> just
> >>>>> pass
> >>>>>>> the
> >>>>>>>>> configuration to ExecutionEnvironment and let
> >>>> ExecutionEnvironment
> >>>>> to
> >>>>>>>>> create the proper ClusterClient instead of letting cli to
> >>> create
> >>>>>>>>> ClusterClient directly.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 6 would involve large code refactoring, so I think we can
> >> defer
> >>>> it
> >>>>>> for
> >>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe.
> >> Let
> >>> me
> >>>>>> know
> >>>>>>>> your
> >>>>>>>>> comments and feedback, thanks
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Best Regards
> >>>>>>>>>
> >>>>>>>>> Jeff Zhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best Regards
> >>>>>>>
> >>>>>>> Jeff Zhang
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best Regards
> >>>>>
> >>>>> Jeff Zhang
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Best Regards
> >>>
> >>> Jeff Zhang
> >>>
> >>
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>
>

-- 
Best Regards

Jeff Zhang

Reply via email to