Is there any possibility to have something like Apache Livy [1] also for
Flink in the future?

[1] https://livy.apache.org/

On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com> wrote:

> >>>  Any API we expose should not have dependencies on the runtime
> (flink-runtime) package or other implementation details. To me, this means
> that the current ClusterClient cannot be exposed to users because it   uses
> quite some classes from the optimiser and runtime packages.
>
> We should change ClusterClient from class to interface.
> ExecutionEnvironment only use the interface ClusterClient which should be
> in flink-clients while the concrete implementation class could be in
> flink-runtime.
>
> >>> What happens when a failure/restart in the client happens? There need
> to be a way of re-establishing the connection to the job, set up the
> listeners again, etc.
>
> Good point.  First we need to define what does failure/restart in the
> client mean. IIUC, that usually mean network failure which will happen in
> class RestClient. If my understanding is correct, restart/retry mechanism
> should be done in RestClient.
>
>
>
>
>
> Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 下午11:10写道:
>
> > Some points to consider:
> >
> > * Any API we expose should not have dependencies on the runtime
> > (flink-runtime) package or other implementation details. To me, this
> means
> > that the current ClusterClient cannot be exposed to users because it
>  uses
> > quite some classes from the optimiser and runtime packages.
> >
> > * What happens when a failure/restart in the client happens? There need
> to
> > be a way of re-establishing the connection to the job, set up the
> listeners
> > again, etc.
> >
> > Aljoscha
> >
> > > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote:
> > >
> > > Sorry folks, the design doc is late as you expected. Here's the design
> > doc
> > > I drafted, welcome any comments and feedback.
> > >
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > >
> > >
> > >
> > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道:
> > >
> > >> Nice that this discussion is happening.
> > >>
> > >> In the FLIP, we could also revisit the entire role of the environments
> > >> again.
> > >>
> > >> Initially, the idea was:
> > >>  - the environments take care of the specific setup for standalone (no
> > >> setup needed), yarn, mesos, etc.
> > >>  - the session ones have control over the session. The environment
> holds
> > >> the session client.
> > >>  - running a job gives a "control" object for that job. That behavior
> is
> > >> the same in all environments.
> > >>
> > >> The actual implementation diverged quite a bit from that. Happy to
> see a
> > >> discussion about straitening this out a bit more.
> > >>
> > >>
> > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote:
> > >>
> > >>> Hi folks,
> > >>>
> > >>> Sorry for late response, It seems we reach consensus on this, I will
> > >> create
> > >>> FLIP for this with more detailed design
> > >>>
> > >>>
> > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
> > >>>
> > >>>> Great to see this discussion seeded! The problems you face with the
> > >>>> Zeppelin integration are also affecting other downstream projects,
> > like
> > >>>> Beam.
> > >>>>
> > >>>> We just enabled the savepoint restore option in
> > RemoteStreamEnvironment
> > >>> [1]
> > >>>> and that was more difficult than it should be. The main issue is
> that
> > >>>> environment and cluster client aren't decoupled. Ideally it should
> be
> > >>>> possible to just get the matching cluster client from the
> environment
> > >> and
> > >>>> then control the job through it (environment as factory for cluster
> > >>>> client). But note that the environment classes are part of the
> public
> > >>> API,
> > >>>> and it is not straightforward to make larger changes without
> breaking
> > >>>> backward compatibility.
> > >>>>
> > >>>> ClusterClient currently exposes internal classes like JobGraph and
> > >>>> StreamGraph. But it should be possible to wrap this with a new
> public
> > >> API
> > >>>> that brings the required job control capabilities for downstream
> > >>> projects.
> > >>>> Perhaps it is helpful to look at some of the interfaces in Beam
> while
> > >>>> thinking about this: [2] for the portable job API and [3] for the
> old
> > >>>> asynchronous job control from the Beam Java SDK.
> > >>>>
> > >>>> The backward compatibility discussion [4] is also relevant here. A
> new
> > >>> API
> > >>>> should shield downstream projects from internals and allow them to
> > >>>> interoperate with multiple future Flink versions in the same release
> > >> line
> > >>>> without forced upgrades.
> > >>>>
> > >>>> Thanks,
> > >>>> Thomas
> > >>>>
> > >>>> [1] https://github.com/apache/flink/pull/7249
> > >>>> [2]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > >>>> [3]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > >>>> [4]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > >>>>
> > >>>>
> > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com>
> wrote:
> > >>>>
> > >>>>>>>> I'm not so sure whether the user should be able to define where
> > >> the
> > >>>> job
> > >>>>> runs (in your example Yarn). This is actually independent of the
> job
> > >>>>> development and is something which is decided at deployment time.
> > >>>>>
> > >>>>> User don't need to specify execution mode programmatically. They
> can
> > >>> also
> > >>>>> pass the execution mode from the arguments in flink run command.
> e.g.
> > >>>>>
> > >>>>> bin/flink run -m yarn-cluster ....
> > >>>>> bin/flink run -m local ...
> > >>>>> bin/flink run -m host:port ...
> > >>>>>
> > >>>>> Does this make sense to you ?
> > >>>>>
> > >>>>>>>> To me it makes sense that the ExecutionEnvironment is not
> > >> directly
> > >>>>> initialized by the user and instead context sensitive how you want
> to
> > >>>>> execute your job (Flink CLI vs. IDE, for example).
> > >>>>>
> > >>>>> Right, currently I notice Flink would create different
> > >>>>> ContextExecutionEnvironment based on different submission scenarios
> > >>>> (Flink
> > >>>>> Cli vs IDE). To me this is kind of hack approach, not so
> > >>> straightforward.
> > >>>>> What I suggested above is that is that flink should always create
> the
> > >>>> same
> > >>>>> ExecutionEnvironment but with different configuration, and based on
> > >> the
> > >>>>> configuration it would create the proper ClusterClient for
> different
> > >>>>> behaviors.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
> > >>>>>
> > >>>>>> You are probably right that we have code duplication when it comes
> > >> to
> > >>>> the
> > >>>>>> creation of the ClusterClient. This should be reduced in the
> > >> future.
> > >>>>>>
> > >>>>>> I'm not so sure whether the user should be able to define where
> the
> > >>> job
> > >>>>>> runs (in your example Yarn). This is actually independent of the
> > >> job
> > >>>>>> development and is something which is decided at deployment time.
> > >> To
> > >>> me
> > >>>>> it
> > >>>>>> makes sense that the ExecutionEnvironment is not directly
> > >> initialized
> > >>>> by
> > >>>>>> the user and instead context sensitive how you want to execute
> your
> > >>> job
> > >>>>>> (Flink CLI vs. IDE, for example). However, I agree that the
> > >>>>>> ExecutionEnvironment should give you access to the ClusterClient
> > >> and
> > >>> to
> > >>>>> the
> > >>>>>> job (maybe in the form of the JobGraph or a job plan).
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Till
> > >>>>>>
> > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com>
> > >> wrote:
> > >>>>>>
> > >>>>>>> Hi Till,
> > >>>>>>> Thanks for the feedback. You are right that I expect better
> > >>>>> programmatic
> > >>>>>>> job submission/control api which could be used by downstream
> > >>> project.
> > >>>>> And
> > >>>>>>> it would benefit for the flink ecosystem. When I look at the code
> > >>> of
> > >>>>>> flink
> > >>>>>>> scala-shell and sql-client (I believe they are not the core of
> > >>> flink,
> > >>>>> but
> > >>>>>>> belong to the ecosystem of flink), I find many duplicated code
> > >> for
> > >>>>>> creating
> > >>>>>>> ClusterClient from user provided configuration (configuration
> > >>> format
> > >>>>> may
> > >>>>>> be
> > >>>>>>> different from scala-shell and sql-client) and then use that
> > >>>>>> ClusterClient
> > >>>>>>> to manipulate jobs. I don't think this is convenient for
> > >> downstream
> > >>>>>>> projects. What I expect is that downstream project only needs to
> > >>>>> provide
> > >>>>>>> necessary configuration info (maybe introducing class FlinkConf),
> > >>> and
> > >>>>>> then
> > >>>>>>> build ExecutionEnvironment based on this FlinkConf, and
> > >>>>>>> ExecutionEnvironment will create the proper ClusterClient. It not
> > >>>> only
> > >>>>>>> benefit for the downstream project development but also be
> > >> helpful
> > >>>> for
> > >>>>>>> their integration test with flink. Here's one sample code snippet
> > >>>> that
> > >>>>> I
> > >>>>>>> expect.
> > >>>>>>>
> > >>>>>>> val conf = new FlinkConf().mode("yarn")
> > >>>>>>> val env = new ExecutionEnvironment(conf)
> > >>>>>>> val jobId = env.submit(...)
> > >>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > >>>>>>> env.getClusterClient().cancelJob(jobId)
> > >>>>>>>
> > >>>>>>> What do you think ?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> > >>>>>>>
> > >>>>>>>> Hi Jeff,
> > >>>>>>>>
> > >>>>>>>> what you are proposing is to provide the user with better
> > >>>>> programmatic
> > >>>>>>> job
> > >>>>>>>> control. There was actually an effort to achieve this but it
> > >> has
> > >>>>> never
> > >>>>>>> been
> > >>>>>>>> completed [1]. However, there are some improvement in the code
> > >>> base
> > >>>>>> now.
> > >>>>>>>> Look for example at the NewClusterClient interface which
> > >> offers a
> > >>>>>>>> non-blocking job submission. But I agree that we need to
> > >> improve
> > >>>>> Flink
> > >>>>>> in
> > >>>>>>>> this regard.
> > >>>>>>>>
> > >>>>>>>> I would not be in favour if exposing all ClusterClient calls
> > >> via
> > >>>> the
> > >>>>>>>> ExecutionEnvironment because it would clutter the class and
> > >> would
> > >>>> not
> > >>>>>> be
> > >>>>>>> a
> > >>>>>>>> good separation of concerns. Instead one idea could be to
> > >>> retrieve
> > >>>>> the
> > >>>>>>>> current ClusterClient from the ExecutionEnvironment which can
> > >>> then
> > >>>> be
> > >>>>>>> used
> > >>>>>>>> for cluster and job control. But before we start an effort
> > >> here,
> > >>> we
> > >>>>>> need
> > >>>>>>> to
> > >>>>>>>> agree and capture what functionality we want to provide.
> > >>>>>>>>
> > >>>>>>>> Initially, the idea was that we have the ClusterDescriptor
> > >>>> describing
> > >>>>>> how
> > >>>>>>>> to talk to cluster manager like Yarn or Mesos. The
> > >>>> ClusterDescriptor
> > >>>>>> can
> > >>>>>>> be
> > >>>>>>>> used for deploying Flink clusters (job and session) and gives
> > >>> you a
> > >>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g.
> > >>>>> submitting
> > >>>>>>>> jobs, listing all running jobs). And then there was the idea to
> > >>>>>>> introduce a
> > >>>>>>>> JobClient which you obtain from the ClusterClient to trigger
> > >> job
> > >>>>>> specific
> > >>>>>>>> operations (e.g. taking a savepoint, cancelling the job).
> > >>>>>>>>
> > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Till
> > >>>>>>>>
> > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Folks,
> > >>>>>>>>>
> > >>>>>>>>> I am trying to integrate flink into apache zeppelin which is
> > >> an
> > >>>>>>>> interactive
> > >>>>>>>>> notebook. And I hit several issues that is caused by flink
> > >>> client
> > >>>>>> api.
> > >>>>>>> So
> > >>>>>>>>> I'd like to proposal the following changes for flink client
> > >>> api.
> > >>>>>>>>>
> > >>>>>>>>> 1. Support nonblocking execution. Currently,
> > >>>>>>> ExecutionEnvironment#execute
> > >>>>>>>>> is a blocking method which would do 2 things, first submit
> > >> job
> > >>>> and
> > >>>>>> then
> > >>>>>>>>> wait for job until it is finished. I'd like introduce a
> > >>>> nonblocking
> > >>>>>>>>> execution method like ExecutionEnvironment#submit which only
> > >>>> submit
> > >>>>>> job
> > >>>>>>>> and
> > >>>>>>>>> then return jobId to client. And allow user to query the job
> > >>>> status
> > >>>>>> via
> > >>>>>>>> the
> > >>>>>>>>> jobId.
> > >>>>>>>>>
> > >>>>>>>>> 2. Add cancel api in
> > >>>>> ExecutionEnvironment/StreamExecutionEnvironment,
> > >>>>>>>>> currently the only way to cancel job is via cli (bin/flink),
> > >>> this
> > >>>>> is
> > >>>>>>> not
> > >>>>>>>>> convenient for downstream project to use this feature. So I'd
> > >>>> like
> > >>>>> to
> > >>>>>>> add
> > >>>>>>>>> cancel api in ExecutionEnvironment
> > >>>>>>>>>
> > >>>>>>>>> 3. Add savepoint api in
> > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > >>>>>>>> It
> > >>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment
> > >> as
> > >>>> the
> > >>>>>>>> unified
> > >>>>>>>>> api for third party to integrate with flink.
> > >>>>>>>>>
> > >>>>>>>>> 4. Add listener for job execution lifecycle. Something like
> > >>>>>> following,
> > >>>>>>> so
> > >>>>>>>>> that downstream project can do custom logic in the lifecycle
> > >> of
> > >>>>> job.
> > >>>>>>> e.g.
> > >>>>>>>>> Zeppelin would capture the jobId after job is submitted and
> > >>> then
> > >>>>> use
> > >>>>>>> this
> > >>>>>>>>> jobId to cancel it later when necessary.
> > >>>>>>>>>
> > >>>>>>>>> public interface JobListener {
> > >>>>>>>>>
> > >>>>>>>>>   void onJobSubmitted(JobID jobId);
> > >>>>>>>>>
> > >>>>>>>>>   void onJobExecuted(JobExecutionResult jobResult);
> > >>>>>>>>>
> > >>>>>>>>>   void onJobCanceled(JobID jobId);
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is
> > >>>>> disabled,
> > >>>>>>> but
> > >>>>>>>>> session is very convenient for third party to submitting jobs
> > >>>>>>>> continually.
> > >>>>>>>>> I hope flink can enable it again.
> > >>>>>>>>>
> > >>>>>>>>> 6. Unify all flink client api into
> > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > >>>>>>>>>
> > >>>>>>>>> This is a long term issue which needs more careful thinking
> > >> and
> > >>>>>> design.
> > >>>>>>>>> Currently some of features of flink is exposed in
> > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are
> > >>>>> exposed
> > >>>>>>> in
> > >>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned
> > >>>>> above.
> > >>>>>> I
> > >>>>>>>>> think the root cause is due to that flink didn't unify the
> > >>>>>> interaction
> > >>>>>>>> with
> > >>>>>>>>> flink. Here I list 3 scenarios of flink operation
> > >>>>>>>>>
> > >>>>>>>>>   - Local job execution.  Flink will create LocalEnvironment
> > >>> and
> > >>>>>> then
> > >>>>>>>> use
> > >>>>>>>>>   this LocalEnvironment to create LocalExecutor for job
> > >>>> execution.
> > >>>>>>>>>   - Remote job execution. Flink will create ClusterClient
> > >>> first
> > >>>>> and
> > >>>>>>> then
> > >>>>>>>>>   create ContextEnvironment based on the ClusterClient and
> > >>> then
> > >>>>> run
> > >>>>>>> the
> > >>>>>>>>> job.
> > >>>>>>>>>   - Job cancelation. Flink will create ClusterClient first
> > >> and
> > >>>>> then
> > >>>>>>>> cancel
> > >>>>>>>>>   this job via this ClusterClient.
> > >>>>>>>>>
> > >>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the
> > >>>> same
> > >>>>>>>>> approach(code path) to interact with flink
> > >>>>>>>>> What I propose is following:
> > >>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based
> > >> on
> > >>>> user
> > >>>>>>>>> configuration) --> Use this Environment to create proper
> > >>>>>> ClusterClient
> > >>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with
> > >>>>> Flink (
> > >>>>>>> job
> > >>>>>>>>> execution or cancelation)
> > >>>>>>>>>
> > >>>>>>>>> This way we can unify the process of local execution and
> > >> remote
> > >>>>>>>> execution.
> > >>>>>>>>> And it is much easier for third party to integrate with
> > >> flink,
> > >>>>>> because
> > >>>>>>>>> ExecutionEnvironment is the unified entry point for flink.
> > >> What
> > >>>>> third
> > >>>>>>>> party
> > >>>>>>>>> needs to do is just pass configuration to
> > >> ExecutionEnvironment
> > >>>> and
> > >>>>>>>>> ExecutionEnvironment will do the right thing based on the
> > >>>>>>> configuration.
> > >>>>>>>>> Flink cli can also be considered as flink api consumer. it
> > >> just
> > >>>>> pass
> > >>>>>>> the
> > >>>>>>>>> configuration to ExecutionEnvironment and let
> > >>>> ExecutionEnvironment
> > >>>>> to
> > >>>>>>>>> create the proper ClusterClient instead of letting cli to
> > >>> create
> > >>>>>>>>> ClusterClient directly.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 6 would involve large code refactoring, so I think we can
> > >> defer
> > >>>> it
> > >>>>>> for
> > >>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe.
> > >> Let
> > >>> me
> > >>>>>> know
> > >>>>>>>> your
> > >>>>>>>>> comments and feedback, thanks
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> Best Regards
> > >>>>>>>>>
> > >>>>>>>>> Jeff Zhang
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Best Regards
> > >>>>>>>
> > >>>>>>> Jeff Zhang
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> Best Regards
> > >>>>>
> > >>>>> Jeff Zhang
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>> Best Regards
> > >>>
> > >>> Jeff Zhang
> > >>>
> > >>
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to