Thanks for the comments, Zhu and Matthias.

@Zhu Zhu

> How about disabling the checkpoint to avoid the cost? I know the cost is 
> there even if we disable the checkpoint at the moment. But I think it can be 
> fixed.
> If HA is disabled, the jobmanager needs to directly participate in all blob 
> shipping work which may result in a hot-spot.

Currently, there are several persistence services that have specific
implementations based on the HA mode:
- JobGraphStore and JobResultStore: These are related to job recovery
and can cause significant redundant I/O in OLAP scenarios, impacting
performance. It may be necessary to configure them as in-memory stores
for OLAP.
- CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
overhead by disabling checkpoints. I agree to remove Checkpoint
Storage from the scope of this FLIP.
- BlobStore: Agree that disabling BlobStore can potentially lead to
hotspots in JobManagers. However, enabling it in OLAP scenarios can
also result in high external storage access overhead , e.g.
JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
In our internal benchmark for short query (with 128 parallelism
WordCount jobs), disabling BlobStore resulted in a 100% increase in
QPS. Therefore, I lean towards disabling it. WDYT?

> FLINK-24038

As Matthias mentioned, each component still needs to write its RPC
address, so this part of the writing may be unavoidable.

@Zhu Zhu @Matthias

> I don't see why the PersistenceServices needs to have access to the 
> MaterialProvider. I feel like there shouldn't be a component that's shared 
> between the LeaderElectionService and the PersistenceServices.
> The corresponding ZooKeeper/k8s implementation would hold the client instance 
> (which is the only thing that should be shared between the 
> LeaderElectionService and the PersistenceServices implementations).

Yes, I agree that this is the goal of splitting the interfaces.
However, when I attempted to split it, I found that these two services
still have implicit temporal dependencies, such as the closure of the
client instance and the cleanup of services and job data.

Regards the refactoring of HighAvailabilityServices, I try to
summarize the following issues that need to be considered:
- Splitting LeaderServices and PersistenceServices; As Matthias
mentioned, this allows for easier testing.
- Removal of deprecated interfaces, such as getWebMonitorLeaderElectionService.
- Reviewing existing multiple close and cleanup interfaces.
- Integration of StandaloneHaServices and EmbeddedHaServices.
I think this topic might be big enough to have a separate discussion
thread. I am now inclined to focus on the discussion of HA
functionality in the OLAP scenario in FLIP-403 and exclude the
refactoring from the scope of this FLIP. This way, we can simply
return different persistence services in AbstractHaServices based on
the configuration. And I'm willing to file a new FLIP (or perhaps a
ticket would be sufficient) for the refactoring of HA. WDYT?


Best,
Yangze Guo

On Thu, Jan 11, 2024 at 12:19 AM Matthias Pohl
<matthias.p...@aiven.io.invalid> wrote:
>
> Thanks for joining the discussion, everyone and sorry for picking it up
> that late. Here are a few points, I want to add to this discussion:
>
> - FLINK-24038 [1] led to a reduction of the curator/k8s client leader
> election requests by having a single leader election per JM rather than
> individual once per RPCEndpoint. We still need to have one record per
> component/RPCEndpoint (i.e. Dispatcher, RM, JobMaster instances, ...),
> though, because we need to save the address for RPC calls (Akka/Pekko) per
> component (each JobMaster has its own RPC endpoint with a dedicated port).
> That is why we cannot get rid of the individual entries/znodes per job.
>
> - An alternative for this FLIP's proposal would be to stick to the current
> HighAvailabilityServices interface. We could come up with a new
> implementation that does provide Standalone instances of what you call
> PersistentServices in this FLIP. That would reduce the efforts that come
> with refactoring the HighAvailabilityServices interface. It should be
> discussed here as an alternative and probably mentioned in the FLIP as a
> rejected alternative if the community agrees.
>
> - From a conceptual point of view, splitting the HighAvailabilityServices
> into LeaderElectionService and PersistentServices (I'm wondering whether
> something like JobHighAvailabilityServices would be more descriptive here.
> The word "persistence" is a bit ambiguous and can also be used in scenarios
> other than HA) makes sense in my opinion. One hint why separating this big
> interface HighAvailabilityServices into two smaller interfaces would make
> sense is the fact that there is a test
> implementation EmbeddedHaServicesWithLeadershipControl right now that
> provides embedded HA with helper methods to control the LeaderElection in
> ITCases. It is a workaround to get access to leader election. With two
> separate interfaces, we could make it easier to test these things.
>
> - I'm not too sure about the proposed class hierarchy of FLIP-403:
>   - What are the semantics of the "MaterialProvider". The name doesn't give
> me any hints on the interface/class purpose. There could be some
> description for this component being added to the FLIP. But on another
> note: I don't see why the PersistenceServices needs to have access to the
> MaterialProvider. I feel like there shouldn't be a component that's shared
> between the LeaderElectionService and the PersistenceServices.
>   - Alternative: What about coming up with a factory interface
> HighAvailabilityServicesFactory which provides two methods:
> createLeaderElectionService & createPersistenceServices. The factory
> wouldn't need to keep any instances (as suggested by this FLIP's
> HighAvailabilityServices component. It's a plain factory component that
> creates instances. The corresponding ZooKeeper/k8s implementation would
> hold the client instance (which is the only thing that should be shared
> between the LeaderElectionService and the PersistenceServices
> implementations). The factory would live in the ClusterEntrypoint. Any
> cleanup of HA data would be covered by the
> LeaderElection|PersistenceServices, individually.
>
> Looking forward to your opinions.
> Best,
> Matthias
>
> On Tue, Jan 9, 2024 at 1:23 PM Zhu Zhu <reed...@gmail.com> wrote:
>
> > > I would treat refactoring as a technical debt...
> >
> > Sorry I don't quite get the needs of the refactoring work.
> >
> > The refactoring work brings benefits if there are requirements to combine
> > different leader election services and persistence services.
> > The answer in this FLIP is to combine DefaultLeaderServices and
> > EmbeddedPersistenceServices. But I'm concerned that, if the goal is to
> > avoid the cost of job recovery, disable the persistence of the overall
> > cluster might be an overkill. e.g. if later we want the cluster partitions
> > to be recovered after JM failover?
> >
> > Yet I do not think of the needs of other new combinations at the moment,
> > e.g. a non-HA leader election service with an HA persistence service,
> > a ZK leader election service with a K8s persistence service. Maybe you
> > have some good cases for it?
> >
> > TBH, the current class structure looks simpler to me. I'm also wondering
> > whether it's possible to merge StandaloneHaServices with
> > EmbeddedHaServices,
> > because the latter one is a special case(all components in the same
> > process)
> > of the former one.
> >
> > > it still involves creating a znode or writing to the configmap
> > for each job
> >
> > Is it possible to avoid the cost? My gut feeling is that these actions
> > are not necessary after Flink does leader election for the overall master
> > process.
> >
> > > such as checkpoint and blob storage except for the job graph store
> >
> > How about disabling the checkpoint to avoid the cost? I know the cost is
> > there
> > even if we disable the checkpoint at the moment. But I think it can be
> > fixed.
> > Checkpoint is not needed if job recovery is not needed, the concepts are
> > highly related.
> >
> > Regarding blob storage, I'm not sure whether it's good to disable HA for
> > it.
> > If HA is disabled, the jobmanager needs to directly participate in all blob
> > shipping work which may result in a hot-spot.
> >
> > WDYT?
> >
> > Thanks,
> > Zhu
> >
> > Yangze Guo <karma...@gmail.com> 于2024年1月9日周二 10:55写道:
> >
> > > Thank you for your comments, Zhu!
> > >
> > > 1. I would treat refactoring as a technical debt and a side effect of
> > > this FLIP. The idea is inspired by Matthias' comments in [1]. It
> > > suggests having a single implementation of HighAvailabilityServices
> > > that requires a factory method for persistence services and leader
> > > services. After this, we will achieve a clearer class hierarchy for
> > > HAServices and eliminate code duplication.
> > >
> > > 2. While FLINK-24038 does eliminate the leader election time cost for
> > > each job, it still involves creating a znode or writing to the
> > > configmap for each job, which can negatively impact performance under
> > > higher workloads. This also applies to all other persistence services
> > > such as checkpoint and blob storage except for the job graph store.
> > >
> > > WDYT?
> > >
> > > [1]
> > >
> > https://issues.apache.org/jira/browse/FLINK-31816?focusedCommentId=17741054&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17741054
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Mon, Jan 8, 2024 at 7:37 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > >
> > > > Thanks for creating the FLIP and starting the discussion, Yangze. It
> > > makes
> > > > sense to me to improve the job submission performance in OLAP
> > scenarios.
> > > >
> > > > I have a few questions regarding the proposed changes:
> > > >
> > > > 1. How about skipping the job graph persistence if the proposed config
> > > > 'high-availability.enable-job-recovery' is set to false? In this way,
> > > > we do not need to do the refactoring work.
> > > >
> > > > 2. Instead of using different HA services for Dispatcher and JobMaster.
> > > > Can we leverage the work of FLINK-24038 to eliminate the leader
> > election
> > > > time cost of each job? Honestly I had thought it was already the truth
> > > but
> > > > seems it is not. This improvement can also benefit non-OLAP jobs.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Yangze Guo <karma...@gmail.com> 于2024年1月8日周一 17:11写道:
> > > >
> > > > > Thanks for the pointer, Rui!
> > > > >
> > > > > I have reviewed FLIP-383, and based on my understanding, this feature
> > > > > should be enabled by default for batch jobs in the future. Therefore,
> > > > > +1 for checking the parameters and issuing log warnings when the user
> > > > > explicitly configures execution.batch.job-recovery.enabled to true.
> > > > >
> > > > > +1 for high-availability.job-recovery.enabled, which would be more
> > > > > suitable with YAML hierarchy.
> > > > >
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com> wrote:
> > > > > >
> > > > > > Thanks to Yangze driving this proposal!
> > > > > >
> > > > > > Overall looks good to me! This proposal is useful for
> > > > > > the performance when the job doesn't need the failover.
> > > > > >
> > > > > > I have some minor questions:
> > > > > >
> > > > > > 1. How does it work with FLIP-383[1]?
> > > > > >
> > > > > > This FLIP introduces a high-availability.enable-job-recovery,
> > > > > > and FLIP-383 introduces a execution.batch.job-recovery.enabled.
> > > > > >
> > > > > > IIUC, when high-availability.enable-job-recovery is false, the job
> > > > > > cannot recover even if execution.batch.job-recovery.enabled = true,
> > > > > > right?
> > > > > >
> > > > > > If so, could we check some parameters and warn some logs? Or
> > > > > > disable the execution.batch.job-recovery.enabled directly when
> > > > > > high-availability.enable-job-recovery = false.
> > > > > >
> > > > > > 2. Could we rename it to high-availability.job-recovery.enabled to
> > > unify
> > > > > > the naming?
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > > [1] https://cwiki.apache.org/confluence/x/QwqZE
> > > > > >
> > > > > > Best,
> > > > > > Rui
> > > > > >
> > > > > > On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo <karma...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Thanks for your comment, Yong.
> > > > > > >
> > > > > > > Here are my thoughts on the splitting of HighAvailableServices:
> > > > > > > Firstly, I would treat this separation as a result of technical
> > > debt
> > > > > > > and a side effect of the FLIP. In order to achieve a cleaner
> > > interface
> > > > > > > hierarchy for High Availability before Flink 2.0, the design
> > > decision
> > > > > > > should not be limited to OLAP scenarios.
> > > > > > > I agree that the current HAServices can be divided based on
> > either
> > > the
> > > > > > > actual target (cluster & job) or the type of functionality
> > (leader
> > > > > > > election & persistence). From a conceptual perspective, I do not
> > > see
> > > > > > > one approach being better than the other. However, I have chosen
> > > the
> > > > > > > current separation for a clear separation of concerns. After
> > > FLIP-285,
> > > > > > > each process has a dedicated LeaderElectionService responsible
> > for
> > > > > > > leader election of all the components within it. This
> > > > > > > LeaderElectionService has its own lifecycle management. If we
> > were
> > > to
> > > > > > > split the HAServices into 'ClusterHighAvailabilityService' and
> > > > > > > 'JobHighAvailabilityService', we would need to couple the
> > lifecycle
> > > > > > > management of these two interfaces, as they both rely on the
> > > > > > > LeaderElectionService and other relevant classes. This coupling
> > and
> > > > > > > implicit design assumption will increase the complexity and
> > testing
> > > > > > > difficulty of the system. WDYT?
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang <zjur...@gmail.com>
> > > wrote:
> > > > > > > >
> > > > > > > > Thanks Yangze for starting this discussion. I have one comment:
> > > why
> > > > > do we
> > > > > > > > need to abstract two services as `LeaderServices` and
> > > > > > > > `PersistenceServices`?
> > > > > > > >
> > > > > > > > From the content, the purpose of this FLIP is to make job
> > > failover
> > > > > more
> > > > > > > > lightweight, so it would be more appropriate to abstract two
> > > > > services as
> > > > > > > > `ClusterHighAvailabilityService` and
> > `JobHighAvailabilityService`
> > > > > instead
> > > > > > > > of `LeaderServices` and `PersistenceServices` based on leader
> > and
> > > > > store.
> > > > > > > In
> > > > > > > > this way, we can create a `JobHighAvailabilityService` that
> > has a
> > > > > leader
> > > > > > > > service and store for the job that meets the requirements based
> > > on
> > > > > the
> > > > > > > > configuration in the zk/k8s high availability service.
> > > > > > > >
> > > > > > > > WDYT?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Fang Yong
> > > > > > > >
> > > > > > > > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng <
> > > xiangyu...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Yangze for restart this discussion.
> > > > > > > > >
> > > > > > > > > +1 for the overall idea. By splitting the
> > > HighAvailabilityServices
> > > > > into
> > > > > > > > > LeaderServices and PersistenceServices, we may support
> > > configuring
> > > > > > > > > different storage behind them in the future.
> > > > > > > > >
> > > > > > > > > We did run into real problems in production where too much
> > job
> > > > > > > metadata was
> > > > > > > > > being stored on ZK, causing system instability.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yangze Guo <karma...@gmail.com> 于2023年12月29日周五 10:21写道:
> > > > > > > > >
> > > > > > > > > > Thanks for the response, Zhanghao.
> > > > > > > > > >
> > > > > > > > > > PersistenceServices sounds good to me.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yangze Guo
> > > > > > > > > >
> > > > > > > > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > > > > > > > > > <zhanghao.c...@outlook.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > Thanks for driving this effort, Yangze! The proposal
> > > overall
> > > > > LGTM.
> > > > > > > > > Other
> > > > > > > > > > from the throughput enhancement in the OLAP scenario, the
> > > > > separation
> > > > > > > of
> > > > > > > > > > leader election/discovery services and the metadata
> > > persistence
> > > > > > > services
> > > > > > > > > > will also make the HA impl clearer and easier to maintain.
> > > Just a
> > > > > > > minor
> > > > > > > > > > comment on naming: would it better to rename
> > > PersistentServices
> > > > > to
> > > > > > > > > > PersistenceServices, as usually we put a noun before
> > > Services?
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Zhanghao Chen
> > > > > > > > > > > ________________________________
> > > > > > > > > > > From: Yangze Guo <karma...@gmail.com>
> > > > > > > > > > > Sent: Tuesday, December 19, 2023 17:33
> > > > > > > > > > > To: dev <dev@flink.apache.org>
> > > > > > > > > > > Subject: [DISCUSS] FLIP-403: High Availability Services
> > for
> > > > > OLAP
> > > > > > > > > > Scenarios
> > > > > > > > > > >
> > > > > > > > > > > Hi, there,
> > > > > > > > > > >
> > > > > > > > > > > We would like to start a discussion thread on "FLIP-403:
> > > High
> > > > > > > > > > > Availability Services for OLAP Scenarios"[1].
> > > > > > > > > > >
> > > > > > > > > > > Currently, Flink's high availability service consists of
> > > two
> > > > > > > > > > > mechanisms: leader election/retrieval services for
> > > JobManager
> > > > > and
> > > > > > > > > > > persistent services for job metadata. However, these
> > > > > mechanisms are
> > > > > > > > > > > set up in an "all or nothing" manner. In OLAP scenarios,
> > we
> > > > > > > typically
> > > > > > > > > > > only require leader election/retrieval services for
> > > JobManager
> > > > > > > > > > > components since jobs usually do not have a restart
> > > strategy.
> > > > > > > > > > > Additionally, the persistence of job states can
> > negatively
> > > > > impact
> > > > > > > the
> > > > > > > > > > > cluster's throughput, especially for short query jobs.
> > > > > > > > > > >
> > > > > > > > > > > To address these issues, this FLIP proposes splitting the
> > > > > > > > > > > HighAvailabilityServices into LeaderServices and
> > > > > > > PersistentServices,
> > > > > > > > > > > and enable users to independently configure the high
> > > > > availability
> > > > > > > > > > > strategies specifically related to jobs.
> > > > > > > > > > >
> > > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > Looking
> > > > > > > > > > > forward to your feedback.
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yangze Guo
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> >

Reply via email to