Hi Peter,

As described above, this effort should get attention from people developing
FLIP-73 a.k.a. Executor abstractions. I recommend you to join the public
slack channel[1] for Flink Client API Enhancement and you can try to share
you detailed thoughts there. It possibly gets more concrete attentions.

Best,
tison.

[1]
https://slack.com/share/IS21SJ75H/Rk8HhUly9FuEHb7oGwBZ33uL/enQtODg2MDYwNjE5MTg3LTA2MjIzNDc1M2ZjZDVlMjdlZjk1M2RkYmJhNjAwMTk2ZDZkODQ4NmY5YmI4OGRhNWJkYTViMTM1NzlmMzc4OWM


Peter Huang <huangzhenqiu0...@gmail.com> 于2020年1月7日周二 上午5:09写道:

> Dear All,
>
> Happy new year! According to existing feedback from the community, we
> revised the doc with the consideration of session cluster support, and
> concrete interface changes needed and execution plan. Please take one more
> round of review at your most convenient time.
>
>
> https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit#
>
>
> Best Regards
> Peter Huang
>
>
>
>
>
> On Thu, Jan 2, 2020 at 11:29 AM Peter Huang <huangzhenqiu0...@gmail.com>
> wrote:
>
> > Hi Dian,
> > Thanks for giving us valuable feedbacks.
> >
> > 1) It's better to have a whole design for this feature
> > For the suggestion of enabling the cluster mode also session cluster, I
> > think Flink already supported it. WebSubmissionExtension already allows
> > users to start a job with the specified jar by using web UI.
> > But we need to enable the feature from CLI for both local jar, remote
> jar.
> > I will align with Yang Wang first about the details and update the design
> > doc.
> >
> > 2) It's better to consider the convenience for users, such as debugging
> >
> > I am wondering whether we can store the exception in jobgragh
> > generation in application master. As no streaming graph can be scheduled
> in
> > this case, there will be no more TM will be requested from FlinkRM.
> > If the AM is still running, users can still query it from CLI. As it
> > requires more change, we can get some feedback from <aljos...@apache.org
> >
> > and @zjf...@gmail.com <zjf...@gmail.com>.
> >
> > 3) It's better to consider the impact to the stability of the cluster
> >
> > I agree with Yang Wang's opinion.
> >
> >
> >
> > Best Regards
> > Peter Huang
> >
> >
> > On Sun, Dec 29, 2019 at 9:44 PM Dian Fu <dian0511...@gmail.com> wrote:
> >
> >> Hi all,
> >>
> >> Sorry to jump into this discussion. Thanks everyone for the discussion.
> >> I'm very interested in this topic although I'm not an expert in this
> part.
> >> So I'm glad to share my thoughts as following:
> >>
> >> 1) It's better to have a whole design for this feature
> >> As we know, there are two deployment modes: per-job mode and session
> >> mode. I'm wondering which mode really needs this feature. As the design
> doc
> >> mentioned, per-job mode is more used for streaming jobs and session
> mode is
> >> usually used for batch jobs(Of course, the job types and the deployment
> >> modes are orthogonal). Usually streaming job is only needed to be
> submitted
> >> once and it will run for days or weeks, while batch jobs will be
> submitted
> >> more frequently compared with streaming jobs. This means that maybe
> session
> >> mode also needs this feature. However, if we support this feature in
> >> session mode, the application master will become the new centralized
> >> service(which should be solved). So in this case, it's better to have a
> >> complete design for both per-job mode and session mode. Furthermore,
> even
> >> if we can do it phase by phase, we need to have a whole picture of how
> it
> >> works in both per-job mode and session mode.
> >>
> >> 2) It's better to consider the convenience for users, such as debugging
> >> After we finish this feature, the job graph will be compiled in the
> >> application master, which means that users cannot easily get the
> exception
> >> message synchorousely in the job client if there are problems during the
> >> job graph compiling (especially for platform users), such as the
> resource
> >> path is incorrect, the user program itself has some problems, etc. What
> I'm
> >> thinking is that maybe we should throw the exceptions as early as
> possible
> >> (during job submission stage).
> >>
> >> 3) It's better to consider the impact to the stability of the cluster
> >> If we perform the compiling in the application master, we should
> consider
> >> the impact of the compiling errors. Although YARN could resume the
> >> application master in case of failures, but in some case the compiling
> >> failure may be a waste of cluster resource and may impact the stability
> the
> >> cluster and the other jobs in the cluster, such as the resource path is
> >> incorrect, the user program itself has some problems(in this case, job
> >> failover cannot solve this kind of problems) etc. In the current
> >> implemention, the compiling errors are handled in the client side and
> there
> >> is no impact to the cluster at all.
> >>
> >> Regarding to 1), it's clearly pointed in the design doc that only
> per-job
> >> mode will be supported. However, I think it's better to also consider
> the
> >> session mode in the design doc.
> >> Regarding to 2) and 3), I have not seen related sections in the design
> >> doc. It will be good if we can cover them in the design doc.
> >>
> >> Feel free to correct me If there is anything I misunderstand.
> >>
> >> Regards,
> >> Dian
> >>
> >>
> >> > 在 2019年12月27日,上午3:13,Peter Huang <huangzhenqiu0...@gmail.com> 写道:
> >> >
> >> > Hi Yang,
> >> >
> >> > I can't agree more. The effort definitely needs to align with the
> final
> >> > goal of FLIP-73.
> >> > I am thinking about whether we can achieve the goal with two phases.
> >> >
> >> > 1) Phase I
> >> > As the CLiFrontend will not be depreciated soon. We can still use the
> >> > deployMode flag there,
> >> > pass the program info through Flink configuration,  use the
> >> > ClassPathJobGraphRetriever
> >> > to generate the job graph in ClusterEntrypoints of yarn and
> Kubernetes.
> >> >
> >> > 2) Phase II
> >> > In  AbstractJobClusterExecutor, the job graph is generated in the
> >> execute
> >> > function. We can still
> >> > use the deployMode in it. With deployMode = cluster, the execute
> >> function
> >> > only starts the cluster.
> >> >
> >> > When {Yarn/Kuberneates}PerJobClusterEntrypoint starts, It will start
> the
> >> > dispatch first, then we can use
> >> > a ClusterEnvironment similar to ContextEnvironment to submit the job
> >> with
> >> > jobName the local
> >> > dispatcher. For the details, we need more investigation. Let's wait
> >> > for @Aljoscha
> >> > Krettek <aljos...@apache.org> @Till Rohrmann <trohrm...@apache.org>'s
> >> > feedback after the holiday season.
> >> >
> >> > Thank you in advance. Merry Chrismas and Happy New Year!!!
> >> >
> >> >
> >> > Best Regards
> >> > Peter Huang
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Wed, Dec 25, 2019 at 1:08 AM Yang Wang <danrtsey...@gmail.com>
> >> wrote:
> >> >
> >> >> Hi Peter,
> >> >>
> >> >> I think we need to reconsider tison's suggestion seriously. After
> >> FLIP-73,
> >> >> the deployJobCluster has
> >> >> beenmoved into `JobClusterExecutor#execute`. It should not be
> perceived
> >> >> for `CliFrontend`. That
> >> >> means the user program will *ALWAYS* be executed on client side. This
> >> is
> >> >> the by design behavior.
> >> >> So, we could not just add `if(client mode) .. else if(cluster mode)
> >> ...`
> >> >> codes in `CliFrontend` to bypass
> >> >> the executor. We need to find a clean way to decouple executing user
> >> >> program and deploying per-job
> >> >> cluster. Based on this, we could support to execute user program on
> >> client
> >> >> or master side.
> >> >>
> >> >> Maybe Aljoscha and Jeff could give some good suggestions.
> >> >>
> >> >>
> >> >>
> >> >> Best,
> >> >> Yang
> >> >>
> >> >> Peter Huang <huangzhenqiu0...@gmail.com> 于2019年12月25日周三 上午4:03写道:
> >> >>
> >> >>> Hi Jingjing,
> >> >>>
> >> >>> The improvement proposed is a deployment option for CLI. For SQL
> based
> >> >>> Flink application, It is more convenient to use the existing model
> in
> >> >>> SqlClient in which
> >> >>> the job graph is generated within SqlClient. After adding the
> delayed
> >> job
> >> >>> graph generation, I think there is no change is needed for your
> side.
> >> >>>
> >> >>>
> >> >>> Best Regards
> >> >>> Peter Huang
> >> >>>
> >> >>>
> >> >>> On Wed, Dec 18, 2019 at 6:01 AM jingjing bai <
> >> baijingjing7...@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>>> hi peter:
> >> >>>>    we had extension SqlClent to support sql job submit in web base
> on
> >> >>>> flink 1.9.   we support submit to yarn on per job mode too.
> >> >>>>    in this case, the job graph generated  on client side .  I think
> >> >>> this
> >> >>>> discuss Mainly to improve api programme.  but in my case , there is
> >> no
> >> >>>> jar to upload but only a sql string .
> >> >>>>    do u had more suggestion to improve for sql mode or it is only a
> >> >>>> switch for api programme?
> >> >>>>
> >> >>>>
> >> >>>> best
> >> >>>> bai jj
> >> >>>>
> >> >>>>
> >> >>>> Yang Wang <danrtsey...@gmail.com> 于2019年12月18日周三 下午7:21写道:
> >> >>>>
> >> >>>>> I just want to revive this discussion.
> >> >>>>>
> >> >>>>> Recently, i am thinking about how to natively run flink per-job
> >> >>> cluster on
> >> >>>>> Kubernetes.
> >> >>>>> The per-job mode on Kubernetes is very different from on Yarn. And
> >> we
> >> >>> will
> >> >>>>> have
> >> >>>>> the same deployment requirements to the client and entry point.
> >> >>>>>
> >> >>>>> 1. Flink client not always need a local jar to start a Flink
> per-job
> >> >>>>> cluster. We could
> >> >>>>> support multiple schemas. For example, file:///path/of/my.jar
> means
> >> a
> >> >>> jar
> >> >>>>> located
> >> >>>>> at client side, hdfs://myhdfs/user/myname/flink/my.jar means a jar
> >> >>> located
> >> >>>>> at
> >> >>>>> remote hdfs, local:///path/in/image/my.jar means a jar located at
> >> >>>>> jobmanager side.
> >> >>>>>
> >> >>>>> 2. Support running user program on master side. This also means
> the
> >> >>> entry
> >> >>>>> point
> >> >>>>> will generate the job graph on master side. We could use the
> >> >>>>> ClasspathJobGraphRetriever
> >> >>>>> or start a local Flink client to achieve this purpose.
> >> >>>>>
> >> >>>>>
> >> >>>>> cc tison, Aljoscha & Kostas Do you think this is the right
> >> direction we
> >> >>>>> need to work?
> >> >>>>>
> >> >>>>> tison <wander4...@gmail.com> 于2019年12月12日周四 下午4:48写道:
> >> >>>>>
> >> >>>>>> A quick idea is that we separate the deployment from user program
> >> >>> that
> >> >>>>> it
> >> >>>>>> has always been done
> >> >>>>>> outside the program. On user program executed there is always a
> >> >>>>>> ClusterClient that communicates with
> >> >>>>>> an existing cluster, remote or local. It will be another thread
> so
> >> >>> just
> >> >>>>> for
> >> >>>>>> your information.
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> tison.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> tison <wander4...@gmail.com> 于2019年12月12日周四 下午4:40写道:
> >> >>>>>>
> >> >>>>>>> Hi Peter,
> >> >>>>>>>
> >> >>>>>>> Another concern I realized recently is that with current
> Executors
> >> >>>>>>> abstraction(FLIP-73)
> >> >>>>>>> I'm afraid that user program is designed to ALWAYS run on the
> >> >>> client
> >> >>>>>> side.
> >> >>>>>>> Specifically,
> >> >>>>>>> we deploy the job in executor when env.execute called. This
> >> >>>>> abstraction
> >> >>>>>>> possibly prevents
> >> >>>>>>> Flink runs user program on the cluster side.
> >> >>>>>>>
> >> >>>>>>> For your proposal, in this case we already compiled the program
> >> and
> >> >>>>> run
> >> >>>>>> on
> >> >>>>>>> the client side,
> >> >>>>>>> even we deploy a cluster and retrieve job graph from program
> >> >>>>> metadata, it
> >> >>>>>>> doesn't make
> >> >>>>>>> many sense.
> >> >>>>>>>
> >> >>>>>>> cc Aljoscha & Kostas what do you think about this constraint?
> >> >>>>>>>
> >> >>>>>>> Best,
> >> >>>>>>> tison.
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> Peter Huang <huangzhenqiu0...@gmail.com> 于2019年12月10日周二
> >> 下午12:45写道:
> >> >>>>>>>
> >> >>>>>>>> Hi Tison,
> >> >>>>>>>>
> >> >>>>>>>> Yes, you are right. I think I made the wrong argument in the
> doc.
> >> >>>>>>>> Basically, the packaging jar problem is only for platform
> users.
> >> >>> In
> >> >>>>> our
> >> >>>>>>>> internal deploy service,
> >> >>>>>>>> we further optimized the deployment latency by letting users to
> >> >>>>>> packaging
> >> >>>>>>>> flink-runtime together with the uber jar, so that we don't need
> >> to
> >> >>>>>>>> consider
> >> >>>>>>>> multiple flink version
> >> >>>>>>>> support for now. In the session client mode, as Flink libs will
> >> be
> >> >>>>>> shipped
> >> >>>>>>>> anyway as local resources of yarn. Users actually don't need to
> >> >>>>> package
> >> >>>>>>>> those libs into job jar.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Best Regards
> >> >>>>>>>> Peter Huang
> >> >>>>>>>>
> >> >>>>>>>> On Mon, Dec 9, 2019 at 8:35 PM tison <wander4...@gmail.com>
> >> >>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>>> 3. What do you mean about the package? Do users need to
> >> >>> compile
> >> >>>>>> their
> >> >>>>>>>>> jars
> >> >>>>>>>>> inlcuding flink-clients, flink-optimizer, flink-table codes?
> >> >>>>>>>>>
> >> >>>>>>>>> The answer should be no because they exist in system
> classpath.
> >> >>>>>>>>>
> >> >>>>>>>>> Best,
> >> >>>>>>>>> tison.
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> Yang Wang <danrtsey...@gmail.com> 于2019年12月10日周二 下午12:18写道:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi Peter,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks a lot for starting this discussion. I think this is a
> >> >>> very
> >> >>>>>>>> useful
> >> >>>>>>>>>> feature.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Not only for Yarn, i am focused on flink on Kubernetes
> >> >>>>> integration
> >> >>>>>> and
> >> >>>>>>>>> come
> >> >>>>>>>>>> across the same
> >> >>>>>>>>>> problem. I do not want the job graph generated on client
> side.
> >> >>>>>>>> Instead,
> >> >>>>>>>>> the
> >> >>>>>>>>>> user jars are built in
> >> >>>>>>>>>> a user-defined image. When the job manager launched, we just
> >> >>>>> need to
> >> >>>>>>>>>> generate the job graph
> >> >>>>>>>>>> based on local user jars.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I have some small suggestion about this.
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1. `ProgramJobGraphRetriever` is very similar to
> >> >>>>>>>>>> `ClasspathJobGraphRetriever`, the differences
> >> >>>>>>>>>> are the former needs `ProgramMetadata` and the latter needs
> >> >>> some
> >> >>>>>>>>> arguments.
> >> >>>>>>>>>> Is it possible to
> >> >>>>>>>>>> have an unified `JobGraphRetriever` to support both?
> >> >>>>>>>>>> 2. Is it possible to not use a local user jar to start a
> >> >>> per-job
> >> >>>>>>>> cluster?
> >> >>>>>>>>>> In your case, the user jars has
> >> >>>>>>>>>> existed on hdfs already and we do need to download the jars
> to
> >> >>>>>>>> deployer
> >> >>>>>>>>>> service. Currently, we
> >> >>>>>>>>>> always need a local user jar to start a flink cluster. It is
> >> >>> be
> >> >>>>>> great
> >> >>>>>>>> if
> >> >>>>>>>>> we
> >> >>>>>>>>>> could support remote user jars.
> >> >>>>>>>>>>>> In the implementation, we assume users package
> >> >>> flink-clients,
> >> >>>>>>>>>> flink-optimizer, flink-table together within the job jar.
> >> >>>>> Otherwise,
> >> >>>>>>>> the
> >> >>>>>>>>>> job graph generation within JobClusterEntryPoint will fail.
> >> >>>>>>>>>> 3. What do you mean about the package? Do users need to
> >> >>> compile
> >> >>>>>> their
> >> >>>>>>>>> jars
> >> >>>>>>>>>> inlcuding flink-clients, flink-optimizer, flink-table codes?
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Yang
> >> >>>>>>>>>>
> >> >>>>>>>>>> Peter Huang <huangzhenqiu0...@gmail.com> 于2019年12月10日周二
> >> >>>>> 上午2:37写道:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Dear All,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Recently, the Flink community starts to improve the yarn
> >> >>>>> cluster
> >> >>>>>>>>>> descriptor
> >> >>>>>>>>>>> to make job jar and config files configurable from CLI. It
> >> >>>>>> improves
> >> >>>>>>>> the
> >> >>>>>>>>>>> flexibility of  Flink deployment Yarn Per Job Mode. For
> >> >>>>> platform
> >> >>>>>>>> users
> >> >>>>>>>>>> who
> >> >>>>>>>>>>> manage tens of hundreds of streaming pipelines for the whole
> >> >>>>> org
> >> >>>>>> or
> >> >>>>>>>>>>> company, we found the job graph generation in client-side is
> >> >>>>>> another
> >> >>>>>>>>>>> pinpoint. Thus, we want to propose a configurable feature
> >> >>> for
> >> >>>>>>>>>>> FlinkYarnSessionCli. The feature can allow users to choose
> >> >>> the
> >> >>>>> job
> >> >>>>>>>>> graph
> >> >>>>>>>>>>> generation in Flink ClusterEntryPoint so that the job jar
> >> >>>>> doesn't
> >> >>>>>>>> need
> >> >>>>>>>>> to
> >> >>>>>>>>>>> be locally for the job graph generation. The proposal is
> >> >>>>> organized
> >> >>>>>>>> as a
> >> >>>>>>>>>>> FLIP
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Delayed+JobGraph+Generation
> >> >>>>>>>>>>> .
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Any questions and suggestions are welcomed. Thank you in
> >> >>>>> advance.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Best Regards
> >> >>>>>>>>>>> Peter Huang
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> >>
>

Reply via email to