Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2023-12-26 Thread Zhanghao Chen
Thanks for driving this effort, Yangze! The proposal overall LGTM. Other from 
the throughput enhancement in the OLAP scenario, the separation of leader 
election/discovery services and the metadata persistence services will also 
make the HA impl clearer and easier to maintain. Just a minor comment on 
naming: would it better to rename PersistentServices to PersistenceServices, as 
usually we put a noun before Services?

Best,
Zhanghao Chen

From: Yangze Guo 
Sent: Tuesday, December 19, 2023 17:33
To: dev 
Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

Hi, there,

We would like to start a discussion thread on "FLIP-403: High
Availability Services for OLAP Scenarios"[1].

Currently, Flink's high availability service consists of two
mechanisms: leader election/retrieval services for JobManager and
persistent services for job metadata. However, these mechanisms are
set up in an "all or nothing" manner. In OLAP scenarios, we typically
only require leader election/retrieval services for JobManager
components since jobs usually do not have a restart strategy.
Additionally, the persistence of job states can negatively impact the
cluster's throughput, especially for short query jobs.

To address these issues, this FLIP proposes splitting the
HighAvailabilityServices into LeaderServices and PersistentServices,
and enable users to independently configure the high availability
strategies specifically related to jobs.

Please find more details in the FLIP wiki document [1]. Looking
forward to your feedback.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios

Best,
Yangze Guo


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2023-12-28 Thread Yangze Guo
Thanks for the response, Zhanghao.

PersistenceServices sounds good to me.

Best,
Yangze Guo

On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
 wrote:
>
> Thanks for driving this effort, Yangze! The proposal overall LGTM. Other from 
> the throughput enhancement in the OLAP scenario, the separation of leader 
> election/discovery services and the metadata persistence services will also 
> make the HA impl clearer and easier to maintain. Just a minor comment on 
> naming: would it better to rename PersistentServices to PersistenceServices, 
> as usually we put a noun before Services?
>
> Best,
> Zhanghao Chen
> 
> From: Yangze Guo 
> Sent: Tuesday, December 19, 2023 17:33
> To: dev 
> Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios
>
> Hi, there,
>
> We would like to start a discussion thread on "FLIP-403: High
> Availability Services for OLAP Scenarios"[1].
>
> Currently, Flink's high availability service consists of two
> mechanisms: leader election/retrieval services for JobManager and
> persistent services for job metadata. However, these mechanisms are
> set up in an "all or nothing" manner. In OLAP scenarios, we typically
> only require leader election/retrieval services for JobManager
> components since jobs usually do not have a restart strategy.
> Additionally, the persistence of job states can negatively impact the
> cluster's throughput, especially for short query jobs.
>
> To address these issues, this FLIP proposes splitting the
> HighAvailabilityServices into LeaderServices and PersistentServices,
> and enable users to independently configure the high availability
> strategies specifically related to jobs.
>
> Please find more details in the FLIP wiki document [1]. Looking
> forward to your feedback.
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
>
> Best,
> Yangze Guo


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2023-12-29 Thread xiangyu feng
Thanks Yangze for restart this discussion.

+1 for the overall idea. By splitting the HighAvailabilityServices into
LeaderServices and PersistenceServices, we may support configuring
different storage behind them in the future.

We did run into real problems in production where too much job metadata was
being stored on ZK, causing system instability.


Yangze Guo  于2023年12月29日周五 10:21写道:

> Thanks for the response, Zhanghao.
>
> PersistenceServices sounds good to me.
>
> Best,
> Yangze Guo
>
> On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
>  wrote:
> >
> > Thanks for driving this effort, Yangze! The proposal overall LGTM. Other
> from the throughput enhancement in the OLAP scenario, the separation of
> leader election/discovery services and the metadata persistence services
> will also make the HA impl clearer and easier to maintain. Just a minor
> comment on naming: would it better to rename PersistentServices to
> PersistenceServices, as usually we put a noun before Services?
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: Yangze Guo 
> > Sent: Tuesday, December 19, 2023 17:33
> > To: dev 
> > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> Scenarios
> >
> > Hi, there,
> >
> > We would like to start a discussion thread on "FLIP-403: High
> > Availability Services for OLAP Scenarios"[1].
> >
> > Currently, Flink's high availability service consists of two
> > mechanisms: leader election/retrieval services for JobManager and
> > persistent services for job metadata. However, these mechanisms are
> > set up in an "all or nothing" manner. In OLAP scenarios, we typically
> > only require leader election/retrieval services for JobManager
> > components since jobs usually do not have a restart strategy.
> > Additionally, the persistence of job states can negatively impact the
> > cluster's throughput, especially for short query jobs.
> >
> > To address these issues, this FLIP proposes splitting the
> > HighAvailabilityServices into LeaderServices and PersistentServices,
> > and enable users to independently configure the high availability
> > strategies specifically related to jobs.
> >
> > Please find more details in the FLIP wiki document [1]. Looking
> > forward to your feedback.
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> >
> > Best,
> > Yangze Guo
>


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-07 Thread Yong Fang
Thanks Yangze for starting this discussion. I have one comment: why do we
need to abstract two services as `LeaderServices` and
`PersistenceServices`?

>From the content, the purpose of this FLIP is to make job failover more
lightweight, so it would be more appropriate to abstract two services as
`ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
of `LeaderServices` and `PersistenceServices` based on leader and store. In
this way, we can create a `JobHighAvailabilityService` that has a leader
service and store for the job that meets the requirements based on the
configuration in the zk/k8s high availability service.

WDYT?

Best,
Fang Yong

On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng  wrote:

> Thanks Yangze for restart this discussion.
>
> +1 for the overall idea. By splitting the HighAvailabilityServices into
> LeaderServices and PersistenceServices, we may support configuring
> different storage behind them in the future.
>
> We did run into real problems in production where too much job metadata was
> being stored on ZK, causing system instability.
>
>
> Yangze Guo  于2023年12月29日周五 10:21写道:
>
> > Thanks for the response, Zhanghao.
> >
> > PersistenceServices sounds good to me.
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> >  wrote:
> > >
> > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> Other
> > from the throughput enhancement in the OLAP scenario, the separation of
> > leader election/discovery services and the metadata persistence services
> > will also make the HA impl clearer and easier to maintain. Just a minor
> > comment on naming: would it better to rename PersistentServices to
> > PersistenceServices, as usually we put a noun before Services?
> > >
> > > Best,
> > > Zhanghao Chen
> > > 
> > > From: Yangze Guo 
> > > Sent: Tuesday, December 19, 2023 17:33
> > > To: dev 
> > > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> > Scenarios
> > >
> > > Hi, there,
> > >
> > > We would like to start a discussion thread on "FLIP-403: High
> > > Availability Services for OLAP Scenarios"[1].
> > >
> > > Currently, Flink's high availability service consists of two
> > > mechanisms: leader election/retrieval services for JobManager and
> > > persistent services for job metadata. However, these mechanisms are
> > > set up in an "all or nothing" manner. In OLAP scenarios, we typically
> > > only require leader election/retrieval services for JobManager
> > > components since jobs usually do not have a restart strategy.
> > > Additionally, the persistence of job states can negatively impact the
> > > cluster's throughput, especially for short query jobs.
> > >
> > > To address these issues, this FLIP proposes splitting the
> > > HighAvailabilityServices into LeaderServices and PersistentServices,
> > > and enable users to independently configure the high availability
> > > strategies specifically related to jobs.
> > >
> > > Please find more details in the FLIP wiki document [1]. Looking
> > > forward to your feedback.
> > >
> > > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> > >
> > > Best,
> > > Yangze Guo
> >
>


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-07 Thread Yangze Guo
Thanks for your comment, Yong.

Here are my thoughts on the splitting of HighAvailableServices:
Firstly, I would treat this separation as a result of technical debt
and a side effect of the FLIP. In order to achieve a cleaner interface
hierarchy for High Availability before Flink 2.0, the design decision
should not be limited to OLAP scenarios.
I agree that the current HAServices can be divided based on either the
actual target (cluster & job) or the type of functionality (leader
election & persistence). From a conceptual perspective, I do not see
one approach being better than the other. However, I have chosen the
current separation for a clear separation of concerns. After FLIP-285,
each process has a dedicated LeaderElectionService responsible for
leader election of all the components within it. This
LeaderElectionService has its own lifecycle management. If we were to
split the HAServices into 'ClusterHighAvailabilityService' and
'JobHighAvailabilityService', we would need to couple the lifecycle
management of these two interfaces, as they both rely on the
LeaderElectionService and other relevant classes. This coupling and
implicit design assumption will increase the complexity and testing
difficulty of the system. WDYT?

Best,
Yangze Guo

On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
>
> Thanks Yangze for starting this discussion. I have one comment: why do we
> need to abstract two services as `LeaderServices` and
> `PersistenceServices`?
>
> From the content, the purpose of this FLIP is to make job failover more
> lightweight, so it would be more appropriate to abstract two services as
> `ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
> of `LeaderServices` and `PersistenceServices` based on leader and store. In
> this way, we can create a `JobHighAvailabilityService` that has a leader
> service and store for the job that meets the requirements based on the
> configuration in the zk/k8s high availability service.
>
> WDYT?
>
> Best,
> Fang Yong
>
> On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng  wrote:
>
> > Thanks Yangze for restart this discussion.
> >
> > +1 for the overall idea. By splitting the HighAvailabilityServices into
> > LeaderServices and PersistenceServices, we may support configuring
> > different storage behind them in the future.
> >
> > We did run into real problems in production where too much job metadata was
> > being stored on ZK, causing system instability.
> >
> >
> > Yangze Guo  于2023年12月29日周五 10:21写道:
> >
> > > Thanks for the response, Zhanghao.
> > >
> > > PersistenceServices sounds good to me.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > >  wrote:
> > > >
> > > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> > Other
> > > from the throughput enhancement in the OLAP scenario, the separation of
> > > leader election/discovery services and the metadata persistence services
> > > will also make the HA impl clearer and easier to maintain. Just a minor
> > > comment on naming: would it better to rename PersistentServices to
> > > PersistenceServices, as usually we put a noun before Services?
> > > >
> > > > Best,
> > > > Zhanghao Chen
> > > > 
> > > > From: Yangze Guo 
> > > > Sent: Tuesday, December 19, 2023 17:33
> > > > To: dev 
> > > > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> > > Scenarios
> > > >
> > > > Hi, there,
> > > >
> > > > We would like to start a discussion thread on "FLIP-403: High
> > > > Availability Services for OLAP Scenarios"[1].
> > > >
> > > > Currently, Flink's high availability service consists of two
> > > > mechanisms: leader election/retrieval services for JobManager and
> > > > persistent services for job metadata. However, these mechanisms are
> > > > set up in an "all or nothing" manner. In OLAP scenarios, we typically
> > > > only require leader election/retrieval services for JobManager
> > > > components since jobs usually do not have a restart strategy.
> > > > Additionally, the persistence of job states can negatively impact the
> > > > cluster's throughput, especially for short query jobs.
> > > >
> > > > To address these issues, this FLIP proposes splitting the
> > > > HighAvailabilityServices into LeaderServices and PersistentServices,
> > > > and enable users to independently configure the high availability
> > > > strategies specifically related to jobs.
> > > >
> > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > forward to your feedback.
> > > >
> > > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-403+High+Availability+Services+for+OLAP+Scenarios
> > > >
> > > > Best,
> > > > Yangze Guo
> > >
> >


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-07 Thread Rui Fan
Thanks to Yangze driving this proposal!

Overall looks good to me! This proposal is useful for
the performance when the job doesn't need the failover.

I have some minor questions:

1. How does it work with FLIP-383[1]?

This FLIP introduces a high-availability.enable-job-recovery,
and FLIP-383 introduces a execution.batch.job-recovery.enabled.

IIUC, when high-availability.enable-job-recovery is false, the job
cannot recover even if execution.batch.job-recovery.enabled = true,
right?

If so, could we check some parameters and warn some logs? Or
disable the execution.batch.job-recovery.enabled directly when
high-availability.enable-job-recovery = false.

2. Could we rename it to high-availability.job-recovery.enabled to unify
the naming?

WDYT?

[1] https://cwiki.apache.org/confluence/x/QwqZE

Best,
Rui

On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo  wrote:

> Thanks for your comment, Yong.
>
> Here are my thoughts on the splitting of HighAvailableServices:
> Firstly, I would treat this separation as a result of technical debt
> and a side effect of the FLIP. In order to achieve a cleaner interface
> hierarchy for High Availability before Flink 2.0, the design decision
> should not be limited to OLAP scenarios.
> I agree that the current HAServices can be divided based on either the
> actual target (cluster & job) or the type of functionality (leader
> election & persistence). From a conceptual perspective, I do not see
> one approach being better than the other. However, I have chosen the
> current separation for a clear separation of concerns. After FLIP-285,
> each process has a dedicated LeaderElectionService responsible for
> leader election of all the components within it. This
> LeaderElectionService has its own lifecycle management. If we were to
> split the HAServices into 'ClusterHighAvailabilityService' and
> 'JobHighAvailabilityService', we would need to couple the lifecycle
> management of these two interfaces, as they both rely on the
> LeaderElectionService and other relevant classes. This coupling and
> implicit design assumption will increase the complexity and testing
> difficulty of the system. WDYT?
>
> Best,
> Yangze Guo
>
> On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
> >
> > Thanks Yangze for starting this discussion. I have one comment: why do we
> > need to abstract two services as `LeaderServices` and
> > `PersistenceServices`?
> >
> > From the content, the purpose of this FLIP is to make job failover more
> > lightweight, so it would be more appropriate to abstract two services as
> > `ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
> > of `LeaderServices` and `PersistenceServices` based on leader and store.
> In
> > this way, we can create a `JobHighAvailabilityService` that has a leader
> > service and store for the job that meets the requirements based on the
> > configuration in the zk/k8s high availability service.
> >
> > WDYT?
> >
> > Best,
> > Fang Yong
> >
> > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng 
> wrote:
> >
> > > Thanks Yangze for restart this discussion.
> > >
> > > +1 for the overall idea. By splitting the HighAvailabilityServices into
> > > LeaderServices and PersistenceServices, we may support configuring
> > > different storage behind them in the future.
> > >
> > > We did run into real problems in production where too much job
> metadata was
> > > being stored on ZK, causing system instability.
> > >
> > >
> > > Yangze Guo  于2023年12月29日周五 10:21写道:
> > >
> > > > Thanks for the response, Zhanghao.
> > > >
> > > > PersistenceServices sounds good to me.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > > >  wrote:
> > > > >
> > > > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> > > Other
> > > > from the throughput enhancement in the OLAP scenario, the separation
> of
> > > > leader election/discovery services and the metadata persistence
> services
> > > > will also make the HA impl clearer and easier to maintain. Just a
> minor
> > > > comment on naming: would it better to rename PersistentServices to
> > > > PersistenceServices, as usually we put a noun before Services?
> > > > >
> > > > > Best,
> > > > > Zhanghao Chen
> > > > > 
> > > > > From: Yangze Guo 
> > > > > Sent: Tuesday, December 19, 2023 17:33
> > > > > To: dev 
> > > > > Subject: [DISCUSS] FLIP-403: High Availability Services for OLAP
> > > > Scenarios
> > > > >
> > > > > Hi, there,
> > > > >
> > > > > We would like to start a discussion thread on "FLIP-403: High
> > > > > Availability Services for OLAP Scenarios"[1].
> > > > >
> > > > > Currently, Flink's high availability service consists of two
> > > > > mechanisms: leader election/retrieval services for JobManager and
> > > > > persistent services for job metadata. However, these mechanisms are
> > > > > set up in an "all or nothing" manner. In OLAP scenarios, we
> typically
> > > > > only require lea

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-08 Thread Yangze Guo
Thanks for the pointer, Rui!

I have reviewed FLIP-383, and based on my understanding, this feature
should be enabled by default for batch jobs in the future. Therefore,
+1 for checking the parameters and issuing log warnings when the user
explicitly configures execution.batch.job-recovery.enabled to true.

+1 for high-availability.job-recovery.enabled, which would be more
suitable with YAML hierarchy.


Best,
Yangze Guo

On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Thanks to Yangze driving this proposal!
>
> Overall looks good to me! This proposal is useful for
> the performance when the job doesn't need the failover.
>
> I have some minor questions:
>
> 1. How does it work with FLIP-383[1]?
>
> This FLIP introduces a high-availability.enable-job-recovery,
> and FLIP-383 introduces a execution.batch.job-recovery.enabled.
>
> IIUC, when high-availability.enable-job-recovery is false, the job
> cannot recover even if execution.batch.job-recovery.enabled = true,
> right?
>
> If so, could we check some parameters and warn some logs? Or
> disable the execution.batch.job-recovery.enabled directly when
> high-availability.enable-job-recovery = false.
>
> 2. Could we rename it to high-availability.job-recovery.enabled to unify
> the naming?
>
> WDYT?
>
> [1] https://cwiki.apache.org/confluence/x/QwqZE
>
> Best,
> Rui
>
> On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo  wrote:
>
> > Thanks for your comment, Yong.
> >
> > Here are my thoughts on the splitting of HighAvailableServices:
> > Firstly, I would treat this separation as a result of technical debt
> > and a side effect of the FLIP. In order to achieve a cleaner interface
> > hierarchy for High Availability before Flink 2.0, the design decision
> > should not be limited to OLAP scenarios.
> > I agree that the current HAServices can be divided based on either the
> > actual target (cluster & job) or the type of functionality (leader
> > election & persistence). From a conceptual perspective, I do not see
> > one approach being better than the other. However, I have chosen the
> > current separation for a clear separation of concerns. After FLIP-285,
> > each process has a dedicated LeaderElectionService responsible for
> > leader election of all the components within it. This
> > LeaderElectionService has its own lifecycle management. If we were to
> > split the HAServices into 'ClusterHighAvailabilityService' and
> > 'JobHighAvailabilityService', we would need to couple the lifecycle
> > management of these two interfaces, as they both rely on the
> > LeaderElectionService and other relevant classes. This coupling and
> > implicit design assumption will increase the complexity and testing
> > difficulty of the system. WDYT?
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
> > >
> > > Thanks Yangze for starting this discussion. I have one comment: why do we
> > > need to abstract two services as `LeaderServices` and
> > > `PersistenceServices`?
> > >
> > > From the content, the purpose of this FLIP is to make job failover more
> > > lightweight, so it would be more appropriate to abstract two services as
> > > `ClusterHighAvailabilityService` and `JobHighAvailabilityService` instead
> > > of `LeaderServices` and `PersistenceServices` based on leader and store.
> > In
> > > this way, we can create a `JobHighAvailabilityService` that has a leader
> > > service and store for the job that meets the requirements based on the
> > > configuration in the zk/k8s high availability service.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Fang Yong
> > >
> > > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng 
> > wrote:
> > >
> > > > Thanks Yangze for restart this discussion.
> > > >
> > > > +1 for the overall idea. By splitting the HighAvailabilityServices into
> > > > LeaderServices and PersistenceServices, we may support configuring
> > > > different storage behind them in the future.
> > > >
> > > > We did run into real problems in production where too much job
> > metadata was
> > > > being stored on ZK, causing system instability.
> > > >
> > > >
> > > > Yangze Guo  于2023年12月29日周五 10:21写道:
> > > >
> > > > > Thanks for the response, Zhanghao.
> > > > >
> > > > > PersistenceServices sounds good to me.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Wed, Dec 27, 2023 at 11:30 AM Zhanghao Chen
> > > > >  wrote:
> > > > > >
> > > > > > Thanks for driving this effort, Yangze! The proposal overall LGTM.
> > > > Other
> > > > > from the throughput enhancement in the OLAP scenario, the separation
> > of
> > > > > leader election/discovery services and the metadata persistence
> > services
> > > > > will also make the HA impl clearer and easier to maintain. Just a
> > minor
> > > > > comment on naming: would it better to rename PersistentServices to
> > > > > PersistenceServices, as usually we put a noun before Services?
> > > > > >
> > > > > > Best,
> > > > > > Zhanghao Chen
> > > > > > ___

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-08 Thread Zhu Zhu
Thanks for creating the FLIP and starting the discussion, Yangze. It makes
sense to me to improve the job submission performance in OLAP scenarios.

I have a few questions regarding the proposed changes:

1. How about skipping the job graph persistence if the proposed config
'high-availability.enable-job-recovery' is set to false? In this way,
we do not need to do the refactoring work.

2. Instead of using different HA services for Dispatcher and JobMaster.
Can we leverage the work of FLINK-24038 to eliminate the leader election
time cost of each job? Honestly I had thought it was already the truth but
seems it is not. This improvement can also benefit non-OLAP jobs.

Thanks,
Zhu

Yangze Guo  于2024年1月8日周一 17:11写道:

> Thanks for the pointer, Rui!
>
> I have reviewed FLIP-383, and based on my understanding, this feature
> should be enabled by default for batch jobs in the future. Therefore,
> +1 for checking the parameters and issuing log warnings when the user
> explicitly configures execution.batch.job-recovery.enabled to true.
>
> +1 for high-availability.job-recovery.enabled, which would be more
> suitable with YAML hierarchy.
>
>
> Best,
> Yangze Guo
>
> On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > Thanks to Yangze driving this proposal!
> >
> > Overall looks good to me! This proposal is useful for
> > the performance when the job doesn't need the failover.
> >
> > I have some minor questions:
> >
> > 1. How does it work with FLIP-383[1]?
> >
> > This FLIP introduces a high-availability.enable-job-recovery,
> > and FLIP-383 introduces a execution.batch.job-recovery.enabled.
> >
> > IIUC, when high-availability.enable-job-recovery is false, the job
> > cannot recover even if execution.batch.job-recovery.enabled = true,
> > right?
> >
> > If so, could we check some parameters and warn some logs? Or
> > disable the execution.batch.job-recovery.enabled directly when
> > high-availability.enable-job-recovery = false.
> >
> > 2. Could we rename it to high-availability.job-recovery.enabled to unify
> > the naming?
> >
> > WDYT?
> >
> > [1] https://cwiki.apache.org/confluence/x/QwqZE
> >
> > Best,
> > Rui
> >
> > On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo  wrote:
> >
> > > Thanks for your comment, Yong.
> > >
> > > Here are my thoughts on the splitting of HighAvailableServices:
> > > Firstly, I would treat this separation as a result of technical debt
> > > and a side effect of the FLIP. In order to achieve a cleaner interface
> > > hierarchy for High Availability before Flink 2.0, the design decision
> > > should not be limited to OLAP scenarios.
> > > I agree that the current HAServices can be divided based on either the
> > > actual target (cluster & job) or the type of functionality (leader
> > > election & persistence). From a conceptual perspective, I do not see
> > > one approach being better than the other. However, I have chosen the
> > > current separation for a clear separation of concerns. After FLIP-285,
> > > each process has a dedicated LeaderElectionService responsible for
> > > leader election of all the components within it. This
> > > LeaderElectionService has its own lifecycle management. If we were to
> > > split the HAServices into 'ClusterHighAvailabilityService' and
> > > 'JobHighAvailabilityService', we would need to couple the lifecycle
> > > management of these two interfaces, as they both rely on the
> > > LeaderElectionService and other relevant classes. This coupling and
> > > implicit design assumption will increase the complexity and testing
> > > difficulty of the system. WDYT?
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
> > > >
> > > > Thanks Yangze for starting this discussion. I have one comment: why
> do we
> > > > need to abstract two services as `LeaderServices` and
> > > > `PersistenceServices`?
> > > >
> > > > From the content, the purpose of this FLIP is to make job failover
> more
> > > > lightweight, so it would be more appropriate to abstract two
> services as
> > > > `ClusterHighAvailabilityService` and `JobHighAvailabilityService`
> instead
> > > > of `LeaderServices` and `PersistenceServices` based on leader and
> store.
> > > In
> > > > this way, we can create a `JobHighAvailabilityService` that has a
> leader
> > > > service and store for the job that meets the requirements based on
> the
> > > > configuration in the zk/k8s high availability service.
> > > >
> > > > WDYT?
> > > >
> > > > Best,
> > > > Fang Yong
> > > >
> > > > On Fri, Dec 29, 2023 at 8:10 PM xiangyu feng 
> > > wrote:
> > > >
> > > > > Thanks Yangze for restart this discussion.
> > > > >
> > > > > +1 for the overall idea. By splitting the HighAvailabilityServices
> into
> > > > > LeaderServices and PersistenceServices, we may support configuring
> > > > > different storage behind them in the future.
> > > > >
> > > > > We did run into real problems in production where too much job
> > > metadata was
> > > > > bei

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-08 Thread Yangze Guo
Thank you for your comments, Zhu!

1. I would treat refactoring as a technical debt and a side effect of
this FLIP. The idea is inspired by Matthias' comments in [1]. It
suggests having a single implementation of HighAvailabilityServices
that requires a factory method for persistence services and leader
services. After this, we will achieve a clearer class hierarchy for
HAServices and eliminate code duplication.

2. While FLINK-24038 does eliminate the leader election time cost for
each job, it still involves creating a znode or writing to the
configmap for each job, which can negatively impact performance under
higher workloads. This also applies to all other persistence services
such as checkpoint and blob storage except for the job graph store.

WDYT?

[1] 
https://issues.apache.org/jira/browse/FLINK-31816?focusedCommentId=17741054&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17741054

Best,
Yangze Guo

On Mon, Jan 8, 2024 at 7:37 PM Zhu Zhu  wrote:
>
> Thanks for creating the FLIP and starting the discussion, Yangze. It makes
> sense to me to improve the job submission performance in OLAP scenarios.
>
> I have a few questions regarding the proposed changes:
>
> 1. How about skipping the job graph persistence if the proposed config
> 'high-availability.enable-job-recovery' is set to false? In this way,
> we do not need to do the refactoring work.
>
> 2. Instead of using different HA services for Dispatcher and JobMaster.
> Can we leverage the work of FLINK-24038 to eliminate the leader election
> time cost of each job? Honestly I had thought it was already the truth but
> seems it is not. This improvement can also benefit non-OLAP jobs.
>
> Thanks,
> Zhu
>
> Yangze Guo  于2024年1月8日周一 17:11写道:
>
> > Thanks for the pointer, Rui!
> >
> > I have reviewed FLIP-383, and based on my understanding, this feature
> > should be enabled by default for batch jobs in the future. Therefore,
> > +1 for checking the parameters and issuing log warnings when the user
> > explicitly configures execution.batch.job-recovery.enabled to true.
> >
> > +1 for high-availability.job-recovery.enabled, which would be more
> > suitable with YAML hierarchy.
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > Thanks to Yangze driving this proposal!
> > >
> > > Overall looks good to me! This proposal is useful for
> > > the performance when the job doesn't need the failover.
> > >
> > > I have some minor questions:
> > >
> > > 1. How does it work with FLIP-383[1]?
> > >
> > > This FLIP introduces a high-availability.enable-job-recovery,
> > > and FLIP-383 introduces a execution.batch.job-recovery.enabled.
> > >
> > > IIUC, when high-availability.enable-job-recovery is false, the job
> > > cannot recover even if execution.batch.job-recovery.enabled = true,
> > > right?
> > >
> > > If so, could we check some parameters and warn some logs? Or
> > > disable the execution.batch.job-recovery.enabled directly when
> > > high-availability.enable-job-recovery = false.
> > >
> > > 2. Could we rename it to high-availability.job-recovery.enabled to unify
> > > the naming?
> > >
> > > WDYT?
> > >
> > > [1] https://cwiki.apache.org/confluence/x/QwqZE
> > >
> > > Best,
> > > Rui
> > >
> > > On Mon, Jan 8, 2024 at 2:04 PM Yangze Guo  wrote:
> > >
> > > > Thanks for your comment, Yong.
> > > >
> > > > Here are my thoughts on the splitting of HighAvailableServices:
> > > > Firstly, I would treat this separation as a result of technical debt
> > > > and a side effect of the FLIP. In order to achieve a cleaner interface
> > > > hierarchy for High Availability before Flink 2.0, the design decision
> > > > should not be limited to OLAP scenarios.
> > > > I agree that the current HAServices can be divided based on either the
> > > > actual target (cluster & job) or the type of functionality (leader
> > > > election & persistence). From a conceptual perspective, I do not see
> > > > one approach being better than the other. However, I have chosen the
> > > > current separation for a clear separation of concerns. After FLIP-285,
> > > > each process has a dedicated LeaderElectionService responsible for
> > > > leader election of all the components within it. This
> > > > LeaderElectionService has its own lifecycle management. If we were to
> > > > split the HAServices into 'ClusterHighAvailabilityService' and
> > > > 'JobHighAvailabilityService', we would need to couple the lifecycle
> > > > management of these two interfaces, as they both rely on the
> > > > LeaderElectionService and other relevant classes. This coupling and
> > > > implicit design assumption will increase the complexity and testing
> > > > difficulty of the system. WDYT?
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Jan 8, 2024 at 12:08 PM Yong Fang  wrote:
> > > > >
> > > > > Thanks Yangze for starting this discussion. I have one comment: why
> > do we
> > > > > need to 

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-09 Thread Zhu Zhu
> I would treat refactoring as a technical debt...

Sorry I don't quite get the needs of the refactoring work.

The refactoring work brings benefits if there are requirements to combine
different leader election services and persistence services.
The answer in this FLIP is to combine DefaultLeaderServices and
EmbeddedPersistenceServices. But I'm concerned that, if the goal is to
avoid the cost of job recovery, disable the persistence of the overall
cluster might be an overkill. e.g. if later we want the cluster partitions
to be recovered after JM failover?

Yet I do not think of the needs of other new combinations at the moment,
e.g. a non-HA leader election service with an HA persistence service,
a ZK leader election service with a K8s persistence service. Maybe you
have some good cases for it?

TBH, the current class structure looks simpler to me. I'm also wondering
whether it's possible to merge StandaloneHaServices with EmbeddedHaServices,
because the latter one is a special case(all components in the same process)
of the former one.

> it still involves creating a znode or writing to the configmap
for each job

Is it possible to avoid the cost? My gut feeling is that these actions
are not necessary after Flink does leader election for the overall master
process.

> such as checkpoint and blob storage except for the job graph store

How about disabling the checkpoint to avoid the cost? I know the cost is
there
even if we disable the checkpoint at the moment. But I think it can be
fixed.
Checkpoint is not needed if job recovery is not needed, the concepts are
highly related.

Regarding blob storage, I'm not sure whether it's good to disable HA for it.
If HA is disabled, the jobmanager needs to directly participate in all blob
shipping work which may result in a hot-spot.

WDYT?

Thanks,
Zhu

Yangze Guo  于2024年1月9日周二 10:55写道:

> Thank you for your comments, Zhu!
>
> 1. I would treat refactoring as a technical debt and a side effect of
> this FLIP. The idea is inspired by Matthias' comments in [1]. It
> suggests having a single implementation of HighAvailabilityServices
> that requires a factory method for persistence services and leader
> services. After this, we will achieve a clearer class hierarchy for
> HAServices and eliminate code duplication.
>
> 2. While FLINK-24038 does eliminate the leader election time cost for
> each job, it still involves creating a znode or writing to the
> configmap for each job, which can negatively impact performance under
> higher workloads. This also applies to all other persistence services
> such as checkpoint and blob storage except for the job graph store.
>
> WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-31816?focusedCommentId=17741054&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17741054
>
> Best,
> Yangze Guo
>
> On Mon, Jan 8, 2024 at 7:37 PM Zhu Zhu  wrote:
> >
> > Thanks for creating the FLIP and starting the discussion, Yangze. It
> makes
> > sense to me to improve the job submission performance in OLAP scenarios.
> >
> > I have a few questions regarding the proposed changes:
> >
> > 1. How about skipping the job graph persistence if the proposed config
> > 'high-availability.enable-job-recovery' is set to false? In this way,
> > we do not need to do the refactoring work.
> >
> > 2. Instead of using different HA services for Dispatcher and JobMaster.
> > Can we leverage the work of FLINK-24038 to eliminate the leader election
> > time cost of each job? Honestly I had thought it was already the truth
> but
> > seems it is not. This improvement can also benefit non-OLAP jobs.
> >
> > Thanks,
> > Zhu
> >
> > Yangze Guo  于2024年1月8日周一 17:11写道:
> >
> > > Thanks for the pointer, Rui!
> > >
> > > I have reviewed FLIP-383, and based on my understanding, this feature
> > > should be enabled by default for batch jobs in the future. Therefore,
> > > +1 for checking the parameters and issuing log warnings when the user
> > > explicitly configures execution.batch.job-recovery.enabled to true.
> > >
> > > +1 for high-availability.job-recovery.enabled, which would be more
> > > suitable with YAML hierarchy.
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Mon, Jan 8, 2024 at 3:43 PM Rui Fan <1996fan...@gmail.com> wrote:
> > > >
> > > > Thanks to Yangze driving this proposal!
> > > >
> > > > Overall looks good to me! This proposal is useful for
> > > > the performance when the job doesn't need the failover.
> > > >
> > > > I have some minor questions:
> > > >
> > > > 1. How does it work with FLIP-383[1]?
> > > >
> > > > This FLIP introduces a high-availability.enable-job-recovery,
> > > > and FLIP-383 introduces a execution.batch.job-recovery.enabled.
> > > >
> > > > IIUC, when high-availability.enable-job-recovery is false, the job
> > > > cannot recover even if execution.batch.job-recovery.enabled = true,
> > > > right?
> > > >
> > > > If so, could we check some parameters and warn some logs? Or
> > > > disable the ex

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-10 Thread Matthias Pohl
Thanks for joining the discussion, everyone and sorry for picking it up
that late. Here are a few points, I want to add to this discussion:

- FLINK-24038 [1] led to a reduction of the curator/k8s client leader
election requests by having a single leader election per JM rather than
individual once per RPCEndpoint. We still need to have one record per
component/RPCEndpoint (i.e. Dispatcher, RM, JobMaster instances, ...),
though, because we need to save the address for RPC calls (Akka/Pekko) per
component (each JobMaster has its own RPC endpoint with a dedicated port).
That is why we cannot get rid of the individual entries/znodes per job.

- An alternative for this FLIP's proposal would be to stick to the current
HighAvailabilityServices interface. We could come up with a new
implementation that does provide Standalone instances of what you call
PersistentServices in this FLIP. That would reduce the efforts that come
with refactoring the HighAvailabilityServices interface. It should be
discussed here as an alternative and probably mentioned in the FLIP as a
rejected alternative if the community agrees.

- From a conceptual point of view, splitting the HighAvailabilityServices
into LeaderElectionService and PersistentServices (I'm wondering whether
something like JobHighAvailabilityServices would be more descriptive here.
The word "persistence" is a bit ambiguous and can also be used in scenarios
other than HA) makes sense in my opinion. One hint why separating this big
interface HighAvailabilityServices into two smaller interfaces would make
sense is the fact that there is a test
implementation EmbeddedHaServicesWithLeadershipControl right now that
provides embedded HA with helper methods to control the LeaderElection in
ITCases. It is a workaround to get access to leader election. With two
separate interfaces, we could make it easier to test these things.

- I'm not too sure about the proposed class hierarchy of FLIP-403:
  - What are the semantics of the "MaterialProvider". The name doesn't give
me any hints on the interface/class purpose. There could be some
description for this component being added to the FLIP. But on another
note: I don't see why the PersistenceServices needs to have access to the
MaterialProvider. I feel like there shouldn't be a component that's shared
between the LeaderElectionService and the PersistenceServices.
  - Alternative: What about coming up with a factory interface
HighAvailabilityServicesFactory which provides two methods:
createLeaderElectionService & createPersistenceServices. The factory
wouldn't need to keep any instances (as suggested by this FLIP's
HighAvailabilityServices component. It's a plain factory component that
creates instances. The corresponding ZooKeeper/k8s implementation would
hold the client instance (which is the only thing that should be shared
between the LeaderElectionService and the PersistenceServices
implementations). The factory would live in the ClusterEntrypoint. Any
cleanup of HA data would be covered by the
LeaderElection|PersistenceServices, individually.

Looking forward to your opinions.
Best,
Matthias

On Tue, Jan 9, 2024 at 1:23 PM Zhu Zhu  wrote:

> > I would treat refactoring as a technical debt...
>
> Sorry I don't quite get the needs of the refactoring work.
>
> The refactoring work brings benefits if there are requirements to combine
> different leader election services and persistence services.
> The answer in this FLIP is to combine DefaultLeaderServices and
> EmbeddedPersistenceServices. But I'm concerned that, if the goal is to
> avoid the cost of job recovery, disable the persistence of the overall
> cluster might be an overkill. e.g. if later we want the cluster partitions
> to be recovered after JM failover?
>
> Yet I do not think of the needs of other new combinations at the moment,
> e.g. a non-HA leader election service with an HA persistence service,
> a ZK leader election service with a K8s persistence service. Maybe you
> have some good cases for it?
>
> TBH, the current class structure looks simpler to me. I'm also wondering
> whether it's possible to merge StandaloneHaServices with
> EmbeddedHaServices,
> because the latter one is a special case(all components in the same
> process)
> of the former one.
>
> > it still involves creating a znode or writing to the configmap
> for each job
>
> Is it possible to avoid the cost? My gut feeling is that these actions
> are not necessary after Flink does leader election for the overall master
> process.
>
> > such as checkpoint and blob storage except for the job graph store
>
> How about disabling the checkpoint to avoid the cost? I know the cost is
> there
> even if we disable the checkpoint at the moment. But I think it can be
> fixed.
> Checkpoint is not needed if job recovery is not needed, the concepts are
> highly related.
>
> Regarding blob storage, I'm not sure whether it's good to disable HA for
> it.
> If HA is disabled, the jobmanager needs to directly participa

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-10 Thread Yangze Guo
Thanks for the comments, Zhu and Matthias.

@Zhu Zhu

> How about disabling the checkpoint to avoid the cost? I know the cost is 
> there even if we disable the checkpoint at the moment. But I think it can be 
> fixed.
> If HA is disabled, the jobmanager needs to directly participate in all blob 
> shipping work which may result in a hot-spot.

Currently, there are several persistence services that have specific
implementations based on the HA mode:
- JobGraphStore and JobResultStore: These are related to job recovery
and can cause significant redundant I/O in OLAP scenarios, impacting
performance. It may be necessary to configure them as in-memory stores
for OLAP.
- CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
overhead by disabling checkpoints. I agree to remove Checkpoint
Storage from the scope of this FLIP.
- BlobStore: Agree that disabling BlobStore can potentially lead to
hotspots in JobManagers. However, enabling it in OLAP scenarios can
also result in high external storage access overhead , e.g.
JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
In our internal benchmark for short query (with 128 parallelism
WordCount jobs), disabling BlobStore resulted in a 100% increase in
QPS. Therefore, I lean towards disabling it. WDYT?

> FLINK-24038

As Matthias mentioned, each component still needs to write its RPC
address, so this part of the writing may be unavoidable.

@Zhu Zhu @Matthias

> I don't see why the PersistenceServices needs to have access to the 
> MaterialProvider. I feel like there shouldn't be a component that's shared 
> between the LeaderElectionService and the PersistenceServices.
> The corresponding ZooKeeper/k8s implementation would hold the client instance 
> (which is the only thing that should be shared between the 
> LeaderElectionService and the PersistenceServices implementations).

Yes, I agree that this is the goal of splitting the interfaces.
However, when I attempted to split it, I found that these two services
still have implicit temporal dependencies, such as the closure of the
client instance and the cleanup of services and job data.

Regards the refactoring of HighAvailabilityServices, I try to
summarize the following issues that need to be considered:
- Splitting LeaderServices and PersistenceServices; As Matthias
mentioned, this allows for easier testing.
- Removal of deprecated interfaces, such as getWebMonitorLeaderElectionService.
- Reviewing existing multiple close and cleanup interfaces.
- Integration of StandaloneHaServices and EmbeddedHaServices.
I think this topic might be big enough to have a separate discussion
thread. I am now inclined to focus on the discussion of HA
functionality in the OLAP scenario in FLIP-403 and exclude the
refactoring from the scope of this FLIP. This way, we can simply
return different persistence services in AbstractHaServices based on
the configuration. And I'm willing to file a new FLIP (or perhaps a
ticket would be sufficient) for the refactoring of HA. WDYT?


Best,
Yangze Guo

On Thu, Jan 11, 2024 at 12:19 AM Matthias Pohl
 wrote:
>
> Thanks for joining the discussion, everyone and sorry for picking it up
> that late. Here are a few points, I want to add to this discussion:
>
> - FLINK-24038 [1] led to a reduction of the curator/k8s client leader
> election requests by having a single leader election per JM rather than
> individual once per RPCEndpoint. We still need to have one record per
> component/RPCEndpoint (i.e. Dispatcher, RM, JobMaster instances, ...),
> though, because we need to save the address for RPC calls (Akka/Pekko) per
> component (each JobMaster has its own RPC endpoint with a dedicated port).
> That is why we cannot get rid of the individual entries/znodes per job.
>
> - An alternative for this FLIP's proposal would be to stick to the current
> HighAvailabilityServices interface. We could come up with a new
> implementation that does provide Standalone instances of what you call
> PersistentServices in this FLIP. That would reduce the efforts that come
> with refactoring the HighAvailabilityServices interface. It should be
> discussed here as an alternative and probably mentioned in the FLIP as a
> rejected alternative if the community agrees.
>
> - From a conceptual point of view, splitting the HighAvailabilityServices
> into LeaderElectionService and PersistentServices (I'm wondering whether
> something like JobHighAvailabilityServices would be more descriptive here.
> The word "persistence" is a bit ambiguous and can also be used in scenarios
> other than HA) makes sense in my opinion. One hint why separating this big
> interface HighAvailabilityServices into two smaller interfaces would make
> sense is the fact that there is a test
> implementation EmbeddedHaServicesWithLeadershipControl right now that
> provides embedded HA with helper methods to control the LeaderElection in
> ITCases. It is a workaround to get access to leader election. With two
> separate inter

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-15 Thread Zhu Zhu
@Yangze

> (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a
100% increase in QPS

Did you look into which part takes most of the time? Jar uploading, Jar
downloading, JobInformation shipping, TDD shipping, or others?

If these objects are large, e.g. a hundreds megabytes connector jar,
will ship it hundreds of times(if parallelism > 100) from JMs to TMs
be a blocker of performance and stability, compared letting the DFS
help with the shipping. If yes, we should not force it to use a void
blobService. Maybe an option should be given to users to switch between
blobServices?

I'm fine to use a void blobService in OLAP scenarios if it works better
in most cases. However, it is a bit weird that we disable blobs if
`enable-job-recovery=false`. Conceptually, they should be unrelated.

> As Matthias mentioned, each component still needs to write its RPC
address, so this part of the writing may be unavoidable.

Thanks Matthias for the inputs.
However, even in non-ha mode, that task manager can connect to JobMaster.
Therefore, I guess it's not necessary to store JM addresses externally.
I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed
for TMs to find out the addresses of JMs via external services?

> focus on the discussion of HA functionality in the OLAP scenario in
FLIP-403 and exclude the refactoring from the scope of this FLIP

It sounds good to me.
Actually the concept of separating leader election and persistence
looks great to me at the first glance. But the shared MaterialProvider
makes it more complicated than I had expected.

Thanks,
Zhu

Yangze Guo  于2024年1月11日周四 14:53写道:

> Thanks for the comments, Zhu and Matthias.
>
> @Zhu Zhu
>
> > How about disabling the checkpoint to avoid the cost? I know the cost is
> there even if we disable the checkpoint at the moment. But I think it can
> be fixed.
> > If HA is disabled, the jobmanager needs to directly participate in all
> blob shipping work which may result in a hot-spot.
>
> Currently, there are several persistence services that have specific
> implementations based on the HA mode:
> - JobGraphStore and JobResultStore: These are related to job recovery
> and can cause significant redundant I/O in OLAP scenarios, impacting
> performance. It may be necessary to configure them as in-memory stores
> for OLAP.
> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
> overhead by disabling checkpoints. I agree to remove Checkpoint
> Storage from the scope of this FLIP.
> - BlobStore: Agree that disabling BlobStore can potentially lead to
> hotspots in JobManagers. However, enabling it in OLAP scenarios can
> also result in high external storage access overhead , e.g.
> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
> In our internal benchmark for short query (with 128 parallelism
> WordCount jobs), disabling BlobStore resulted in a 100% increase in
> QPS. Therefore, I lean towards disabling it. WDYT?
>
> > FLINK-24038
>
> As Matthias mentioned, each component still needs to write its RPC
> address, so this part of the writing may be unavoidable.
>
> @Zhu Zhu @Matthias
>
> > I don't see why the PersistenceServices needs to have access to the
> MaterialProvider. I feel like there shouldn't be a component that's shared
> between the LeaderElectionService and the PersistenceServices.
> > The corresponding ZooKeeper/k8s implementation would hold the client
> instance (which is the only thing that should be shared between the
> LeaderElectionService and the PersistenceServices implementations).
>
> Yes, I agree that this is the goal of splitting the interfaces.
> However, when I attempted to split it, I found that these two services
> still have implicit temporal dependencies, such as the closure of the
> client instance and the cleanup of services and job data.
>
> Regards the refactoring of HighAvailabilityServices, I try to
> summarize the following issues that need to be considered:
> - Splitting LeaderServices and PersistenceServices; As Matthias
> mentioned, this allows for easier testing.
> - Removal of deprecated interfaces, such as
> getWebMonitorLeaderElectionService.
> - Reviewing existing multiple close and cleanup interfaces.
> - Integration of StandaloneHaServices and EmbeddedHaServices.
> I think this topic might be big enough to have a separate discussion
> thread. I am now inclined to focus on the discussion of HA
> functionality in the OLAP scenario in FLIP-403 and exclude the
> refactoring from the scope of this FLIP. This way, we can simply
> return different persistence services in AbstractHaServices based on
> the configuration. And I'm willing to file a new FLIP (or perhaps a
> ticket would be sufficient) for the refactoring of HA. WDYT?
>
>
> Best,
> Yangze Guo
>
> On Thu, Jan 11, 2024 at 12:19 AM Matthias Pohl
>  wrote:
> >
> > Thanks for joining the discussion, everyone and sorry for picking it

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-15 Thread Zhu Zhu
Correction:
I'm fine to use a void blobService in OLAP scenarios if it works better
in most cases.  -> I'm fine to use a void blobService in OLAP scenarios
*by default* if it works better in most cases.



Zhu Zhu  于2024年1月15日周一 17:51写道:

> @Yangze
>
> > (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a
> 100% increase in QPS
>
> Did you look into which part takes most of the time? Jar uploading, Jar
> downloading, JobInformation shipping, TDD shipping, or others?
>
> If these objects are large, e.g. a hundreds megabytes connector jar,
> will ship it hundreds of times(if parallelism > 100) from JMs to TMs
> be a blocker of performance and stability, compared letting the DFS
> help with the shipping. If yes, we should not force it to use a void
> blobService. Maybe an option should be given to users to switch between
> blobServices?
>
> I'm fine to use a void blobService in OLAP scenarios if it works better
> in most cases. However, it is a bit weird that we disable blobs if
> `enable-job-recovery=false`. Conceptually, they should be unrelated.
>
> > As Matthias mentioned, each component still needs to write its RPC
> address, so this part of the writing may be unavoidable.
>
> Thanks Matthias for the inputs.
> However, even in non-ha mode, that task manager can connect to JobMaster.
> Therefore, I guess it's not necessary to store JM addresses externally.
> I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
> accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed
> for TMs to find out the addresses of JMs via external services?
>
> > focus on the discussion of HA functionality in the OLAP scenario in
> FLIP-403 and exclude the refactoring from the scope of this FLIP
>
> It sounds good to me.
> Actually the concept of separating leader election and persistence
> looks great to me at the first glance. But the shared MaterialProvider
> makes it more complicated than I had expected.
>
> Thanks,
> Zhu
>
> Yangze Guo  于2024年1月11日周四 14:53写道:
>
>> Thanks for the comments, Zhu and Matthias.
>>
>> @Zhu Zhu
>>
>> > How about disabling the checkpoint to avoid the cost? I know the cost
>> is there even if we disable the checkpoint at the moment. But I think it
>> can be fixed.
>> > If HA is disabled, the jobmanager needs to directly participate in all
>> blob shipping work which may result in a hot-spot.
>>
>> Currently, there are several persistence services that have specific
>> implementations based on the HA mode:
>> - JobGraphStore and JobResultStore: These are related to job recovery
>> and can cause significant redundant I/O in OLAP scenarios, impacting
>> performance. It may be necessary to configure them as in-memory stores
>> for OLAP.
>> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
>> overhead by disabling checkpoints. I agree to remove Checkpoint
>> Storage from the scope of this FLIP.
>> - BlobStore: Agree that disabling BlobStore can potentially lead to
>> hotspots in JobManagers. However, enabling it in OLAP scenarios can
>> also result in high external storage access overhead , e.g.
>> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
>> In our internal benchmark for short query (with 128 parallelism
>> WordCount jobs), disabling BlobStore resulted in a 100% increase in
>> QPS. Therefore, I lean towards disabling it. WDYT?
>>
>> > FLINK-24038
>>
>> As Matthias mentioned, each component still needs to write its RPC
>> address, so this part of the writing may be unavoidable.
>>
>> @Zhu Zhu @Matthias
>>
>> > I don't see why the PersistenceServices needs to have access to the
>> MaterialProvider. I feel like there shouldn't be a component that's shared
>> between the LeaderElectionService and the PersistenceServices.
>> > The corresponding ZooKeeper/k8s implementation would hold the client
>> instance (which is the only thing that should be shared between the
>> LeaderElectionService and the PersistenceServices implementations).
>>
>> Yes, I agree that this is the goal of splitting the interfaces.
>> However, when I attempted to split it, I found that these two services
>> still have implicit temporal dependencies, such as the closure of the
>> client instance and the cleanup of services and job data.
>>
>> Regards the refactoring of HighAvailabilityServices, I try to
>> summarize the following issues that need to be considered:
>> - Splitting LeaderServices and PersistenceServices; As Matthias
>> mentioned, this allows for easier testing.
>> - Removal of deprecated interfaces, such as
>> getWebMonitorLeaderElectionService.
>> - Reviewing existing multiple close and cleanup interfaces.
>> - Integration of StandaloneHaServices and EmbeddedHaServices.
>> I think this topic might be big enough to have a separate discussion
>> thread. I am now inclined to focus on the discussion of HA
>> functionality in the OLAP scenario in FLIP-403 and exclude the
>> refactoring from the scope of this FLIP. This way, we can sim

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-16 Thread Yangze Guo
Thanks for the comments, Zhu.

> Did you look into which part takes most of the time? Jar uploading, Jar 
> downloading, JobInformation shipping, TDD shipping, or others?

In our scenario, the key factor should be the JobInformation shipping,
as the jobs are completed within 1 second. This can have a big impact
on the QPS.

> If these objects are large, e.g. a hundreds megabytes connector jar, will 
> ship it hundreds of times(if parallelism > 100) from JMs to TMs be a blocker 
> of performance and stability, compared letting the DFS help with the 
> shipping... I'm fine to use a void blobService in OLAP scenarios *by default* 
> if it works better in most cases.

Thanks for the input. Currently, in our scenario, the connector jars
are pre-deployed on the JM and TM, and each job submission only
includes the serialized JobGraph. However, if there are custom
connectors and UDFs involved in the future, I believe choosing the
appropriate blob strategy will indeed require a further analysis. So,
+1 for providing users with the option to switch between blob
services. high-availability.blob-store.enabled sounds good from my
side. We can set it to false if it is not manually configured and if
high-availability.job-recovery.enabled is set to false.

If there are no further comments, I will adjust the FLIP based on
these discussions and then initiate a vote.

Best,
Yangze Guo

On Mon, Jan 15, 2024 at 5:55 PM Zhu Zhu  wrote:
>
> Correction:
> I'm fine to use a void blobService in OLAP scenarios if it works better
> in most cases.  -> I'm fine to use a void blobService in OLAP scenarios
> *by default* if it works better in most cases.
>
>
>
> Zhu Zhu  于2024年1月15日周一 17:51写道:
>
> > @Yangze
> >
> > > (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a
> > 100% increase in QPS
> >
> > Did you look into which part takes most of the time? Jar uploading, Jar
> > downloading, JobInformation shipping, TDD shipping, or others?
> >
> > If these objects are large, e.g. a hundreds megabytes connector jar,
> > will ship it hundreds of times(if parallelism > 100) from JMs to TMs
> > be a blocker of performance and stability, compared letting the DFS
> > help with the shipping. If yes, we should not force it to use a void
> > blobService. Maybe an option should be given to users to switch between
> > blobServices?
> >
> > I'm fine to use a void blobService in OLAP scenarios if it works better
> > in most cases. However, it is a bit weird that we disable blobs if
> > `enable-job-recovery=false`. Conceptually, they should be unrelated.
> >
> > > As Matthias mentioned, each component still needs to write its RPC
> > address, so this part of the writing may be unavoidable.
> >
> > Thanks Matthias for the inputs.
> > However, even in non-ha mode, that task manager can connect to JobMaster.
> > Therefore, I guess it's not necessary to store JM addresses externally.
> > I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
> > accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed
> > for TMs to find out the addresses of JMs via external services?
> >
> > > focus on the discussion of HA functionality in the OLAP scenario in
> > FLIP-403 and exclude the refactoring from the scope of this FLIP
> >
> > It sounds good to me.
> > Actually the concept of separating leader election and persistence
> > looks great to me at the first glance. But the shared MaterialProvider
> > makes it more complicated than I had expected.
> >
> > Thanks,
> > Zhu
> >
> > Yangze Guo  于2024年1月11日周四 14:53写道:
> >
> >> Thanks for the comments, Zhu and Matthias.
> >>
> >> @Zhu Zhu
> >>
> >> > How about disabling the checkpoint to avoid the cost? I know the cost
> >> is there even if we disable the checkpoint at the moment. But I think it
> >> can be fixed.
> >> > If HA is disabled, the jobmanager needs to directly participate in all
> >> blob shipping work which may result in a hot-spot.
> >>
> >> Currently, there are several persistence services that have specific
> >> implementations based on the HA mode:
> >> - JobGraphStore and JobResultStore: These are related to job recovery
> >> and can cause significant redundant I/O in OLAP scenarios, impacting
> >> performance. It may be necessary to configure them as in-memory stores
> >> for OLAP.
> >> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
> >> overhead by disabling checkpoints. I agree to remove Checkpoint
> >> Storage from the scope of this FLIP.
> >> - BlobStore: Agree that disabling BlobStore can potentially lead to
> >> hotspots in JobManagers. However, enabling it in OLAP scenarios can
> >> also result in high external storage access overhead , e.g.
> >> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
> >> In our internal benchmark for short query (with 128 parallelism
> >> WordCount jobs), disabling BlobStore resulted in a 100% increase in
> >> QPS. Therefore, I lean towards disabling it. WDYT?
> >>
> >> > FLINK-24038

Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-16 Thread Yang Wang
I am completely in favor of splitting the LeaderServices and
PersistenceServices
while sharing the same concern that MaterialProvider is not very easy to
understand.
It just feels like we do the separation but not thoroughly.

If you have a clear plan for the subsequent improvements, I am fine that we
only focus
on the OLAP requirements in FLIP-403.


Best,
Yang

On Wed, Jan 17, 2024 at 11:40 AM Yangze Guo  wrote:

> Thanks for the comments, Zhu.
>
> > Did you look into which part takes most of the time? Jar uploading, Jar
> downloading, JobInformation shipping, TDD shipping, or others?
>
> In our scenario, the key factor should be the JobInformation shipping,
> as the jobs are completed within 1 second. This can have a big impact
> on the QPS.
>
> > If these objects are large, e.g. a hundreds megabytes connector jar,
> will ship it hundreds of times(if parallelism > 100) from JMs to TMs be a
> blocker of performance and stability, compared letting the DFS help with
> the shipping... I'm fine to use a void blobService in OLAP scenarios *by
> default* if it works better in most cases.
>
> Thanks for the input. Currently, in our scenario, the connector jars
> are pre-deployed on the JM and TM, and each job submission only
> includes the serialized JobGraph. However, if there are custom
> connectors and UDFs involved in the future, I believe choosing the
> appropriate blob strategy will indeed require a further analysis. So,
> +1 for providing users with the option to switch between blob
> services. high-availability.blob-store.enabled sounds good from my
> side. We can set it to false if it is not manually configured and if
> high-availability.job-recovery.enabled is set to false.
>
> If there are no further comments, I will adjust the FLIP based on
> these discussions and then initiate a vote.
>
> Best,
> Yangze Guo
>
> On Mon, Jan 15, 2024 at 5:55 PM Zhu Zhu  wrote:
> >
> > Correction:
> > I'm fine to use a void blobService in OLAP scenarios if it works better
> > in most cases.  -> I'm fine to use a void blobService in OLAP scenarios
> > *by default* if it works better in most cases.
> >
> >
> >
> > Zhu Zhu  于2024年1月15日周一 17:51写道:
> >
> > > @Yangze
> > >
> > > > (with 128 parallelism WordCount jobs), disabling BlobStore resulted
> in a
> > > 100% increase in QPS
> > >
> > > Did you look into which part takes most of the time? Jar uploading, Jar
> > > downloading, JobInformation shipping, TDD shipping, or others?
> > >
> > > If these objects are large, e.g. a hundreds megabytes connector jar,
> > > will ship it hundreds of times(if parallelism > 100) from JMs to TMs
> > > be a blocker of performance and stability, compared letting the DFS
> > > help with the shipping. If yes, we should not force it to use a void
> > > blobService. Maybe an option should be given to users to switch between
> > > blobServices?
> > >
> > > I'm fine to use a void blobService in OLAP scenarios if it works better
> > > in most cases. However, it is a bit weird that we disable blobs if
> > > `enable-job-recovery=false`. Conceptually, they should be unrelated.
> > >
> > > > As Matthias mentioned, each component still needs to write its RPC
> > > address, so this part of the writing may be unavoidable.
> > >
> > > Thanks Matthias for the inputs.
> > > However, even in non-ha mode, that task manager can connect to
> JobMaster.
> > > Therefore, I guess it's not necessary to store JM addresses externally.
> > > I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
> > > accepts a parameter `defaultJobManagerAddress`. So maybe it's not
> needed
> > > for TMs to find out the addresses of JMs via external services?
> > >
> > > > focus on the discussion of HA functionality in the OLAP scenario in
> > > FLIP-403 and exclude the refactoring from the scope of this FLIP
> > >
> > > It sounds good to me.
> > > Actually the concept of separating leader election and persistence
> > > looks great to me at the first glance. But the shared MaterialProvider
> > > makes it more complicated than I had expected.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Yangze Guo  于2024年1月11日周四 14:53写道:
> > >
> > >> Thanks for the comments, Zhu and Matthias.
> > >>
> > >> @Zhu Zhu
> > >>
> > >> > How about disabling the checkpoint to avoid the cost? I know the
> cost
> > >> is there even if we disable the checkpoint at the moment. But I think
> it
> > >> can be fixed.
> > >> > If HA is disabled, the jobmanager needs to directly participate in
> all
> > >> blob shipping work which may result in a hot-spot.
> > >>
> > >> Currently, there are several persistence services that have specific
> > >> implementations based on the HA mode:
> > >> - JobGraphStore and JobResultStore: These are related to job recovery
> > >> and can cause significant redundant I/O in OLAP scenarios, impacting
> > >> performance. It may be necessary to configure them as in-memory stores
> > >> for OLAP.
> > >> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
> > >>