Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-31 Thread Hangxiang Yu
Hi Yun.
Thanks for the great suggestion.
I just added related information into the FLIP.

On Sat, Mar 30, 2024 at 10:49 AM Yun Tang  wrote:

> Hi Feifan,
>
> I just replied in the discussion of FLIP-428. I agree that we could leave
> the clean-up optimization in the future FLIP, however, I think we should
> mention this topic explicitly in the current FLIP to make the overall
> design complete and more sophisticated.
>
> Best
> Yun Tang
> 
> From: Feifan Wang 
> Sent: Thursday, March 28, 2024 12:35
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-427: Disaggregated State Store
>
> Thanks for your reply, Hangxiang. I totally agree with you about the jni
> part.
>
> Hi Yun Tang, I just noticed that FLIP-427 mentions “The life cycle of
> working dir is managed as before local strategy.” IIUC, the working dir
> will be deleted after TaskManager exit. And I think that's enough for
> current stage, WDYT ?
>
> ——
>
> Best regards,
>
> Feifan Wang
>
>
>
>
> At 2024-03-28 12:18:56, "Hangxiang Yu"  wrote:
> >Hi, Feifan.
> >
> >Thanks for your reply.
> >
> >What if we only use jni to access DFS that needs to reuse Flink
> FileSystem?
> >> And all local disk access through native api. This idea is based on the
> >> understanding that jni overhead is not worth mentioning compared to DFS
> >> access latency. It might make more sense to consider avoiding jni
> overhead
> >> for faster local disks. Since local disk as secondary is already under
> >> consideration [1], maybe we can discuss in that FLIP whether to use
> native
> >> api to access local disk?
> >>
> >This is a good suggestion. It's reasonable to use native api to access
> >local disk cache since it requires lower latency compared to remote
> access.
> >I also believe that the jni overhead is relatively negligible when weighed
> >against the latency of remote I/O as mentioned in the FLIP.
> >So I think we could just go on proposal 2 and keep proposal 1 as a
> >potential future optimization, which could work better when there is a
> >higher performance requirement or some native libraries of filesystems
> have
> >significantly higher performance and resource usage compared to their java
> >libs.
> >
> >
> >On Thu, Mar 28, 2024 at 11:39 AM Feifan Wang  wrote:
> >
> >> Thanks for this valuable proposal Hangxiang !
> >>
> >>
> >> > If we need to introduce a JNI call during each filesystem call, that
> >> would be N times JNI cost compared with the current RocksDB
> state-backend's
> >> JNI cost.
> >> What if we only use jni to access DFS that needs to reuse Flink
> >> FileSystem? And all local disk access through native api. This idea is
> >> based on the understanding that jni overhead is not worth mentioning
> >> compared to DFS access latency. It might make more sense to consider
> >> avoiding jni overhead for faster local disks. Since local disk as
> secondary
> >> is already under consideration [1], maybe we can discuss in that FLIP
> >> whether to use native api to access local disk?
> >>
> >>
> >> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
> >> >Different disaggregated state storages may have their own semantics
> about
> >> >this configuration, e.g. life cycle, supported file systems or
> storages.
> >> I agree with considering moving this configuration up to the engine
> level
> >> until there are other disaggreated backends.
> >>
> >>
> >> [1] https://cwiki.apache.org/confluence/x/U4p3EQ
> >>
> >> ——
> >>
> >> Best regards,
> >>
> >> Feifan Wang
> >>
> >>
> >>
> >>
> >> At 2024-03-28 09:55:48, "Hangxiang Yu"  wrote:
> >> >Hi, Yun.
> >> >Thanks for the reply.
> >> >
> >> >The JNI cost you considered is right. As replied to Yue, I agreed to
> leave
> >> >space and consider proposal 1 as an optimization in the future, which
> is
> >> >also updated in the FLIP.
> >> >
> >> >The other question is that the configuration of
> >> >> `state.backend.forSt.working-dir` looks too coupled with the ForSt
> >> >> state-backend, how would it be if we introduce another disaggregated
> >> state
> >> >> storage? Thus, I think `state.backend.disaggregated.working-dir`
> might
> >> be a
>

Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-29 Thread Yun Tang
Hi Feifan,

I just replied in the discussion of FLIP-428. I agree that we could leave the 
clean-up optimization in the future FLIP, however, I think we should mention 
this topic explicitly in the current FLIP to make the overall design complete 
and more sophisticated.

Best
Yun Tang

From: Feifan Wang 
Sent: Thursday, March 28, 2024 12:35
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-427: Disaggregated State Store

Thanks for your reply, Hangxiang. I totally agree with you about the jni part.

Hi Yun Tang, I just noticed that FLIP-427 mentions “The life cycle of working 
dir is managed as before local strategy.” IIUC, the working dir will be deleted 
after TaskManager exit. And I think that's enough for current stage, WDYT ?

——

Best regards,

Feifan Wang




At 2024-03-28 12:18:56, "Hangxiang Yu"  wrote:
>Hi, Feifan.
>
>Thanks for your reply.
>
>What if we only use jni to access DFS that needs to reuse Flink FileSystem?
>> And all local disk access through native api. This idea is based on the
>> understanding that jni overhead is not worth mentioning compared to DFS
>> access latency. It might make more sense to consider avoiding jni overhead
>> for faster local disks. Since local disk as secondary is already under
>> consideration [1], maybe we can discuss in that FLIP whether to use native
>> api to access local disk?
>>
>This is a good suggestion. It's reasonable to use native api to access
>local disk cache since it requires lower latency compared to remote access.
>I also believe that the jni overhead is relatively negligible when weighed
>against the latency of remote I/O as mentioned in the FLIP.
>So I think we could just go on proposal 2 and keep proposal 1 as a
>potential future optimization, which could work better when there is a
>higher performance requirement or some native libraries of filesystems have
>significantly higher performance and resource usage compared to their java
>libs.
>
>
>On Thu, Mar 28, 2024 at 11:39 AM Feifan Wang  wrote:
>
>> Thanks for this valuable proposal Hangxiang !
>>
>>
>> > If we need to introduce a JNI call during each filesystem call, that
>> would be N times JNI cost compared with the current RocksDB state-backend's
>> JNI cost.
>> What if we only use jni to access DFS that needs to reuse Flink
>> FileSystem? And all local disk access through native api. This idea is
>> based on the understanding that jni overhead is not worth mentioning
>> compared to DFS access latency. It might make more sense to consider
>> avoiding jni overhead for faster local disks. Since local disk as secondary
>> is already under consideration [1], maybe we can discuss in that FLIP
>> whether to use native api to access local disk?
>>
>>
>> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
>> >Different disaggregated state storages may have their own semantics about
>> >this configuration, e.g. life cycle, supported file systems or storages.
>> I agree with considering moving this configuration up to the engine level
>> until there are other disaggreated backends.
>>
>>
>> [1] https://cwiki.apache.org/confluence/x/U4p3EQ
>>
>> ——
>>
>> Best regards,
>>
>> Feifan Wang
>>
>>
>>
>>
>> At 2024-03-28 09:55:48, "Hangxiang Yu"  wrote:
>> >Hi, Yun.
>> >Thanks for the reply.
>> >
>> >The JNI cost you considered is right. As replied to Yue, I agreed to leave
>> >space and consider proposal 1 as an optimization in the future, which is
>> >also updated in the FLIP.
>> >
>> >The other question is that the configuration of
>> >> `state.backend.forSt.working-dir` looks too coupled with the ForSt
>> >> state-backend, how would it be if we introduce another disaggregated
>> state
>> >> storage? Thus, I think `state.backend.disaggregated.working-dir` might
>> be a
>> >> better configuration name.
>> >
>> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
>> >Different disaggregated state storages may have their own semantics about
>> >this configuration, e.g. life cycle, supported file systems or storages.
>> >Maybe it's more suitable to consider it together when we introduce other
>> >disaggregated state storages in the future.
>> >
>> >On Thu, Mar 28, 2024 at 12:02 AM Yun Tang  wrote:
>> >
>> >> Hi Hangxiang,
>> >>
>> >> The design looks good, and I also support leaving space for propo

Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-27 Thread Feifan Wang
Thanks for your reply, Hangxiang. I totally agree with you about the jni part.

Hi Yun Tang, I just noticed that FLIP-427 mentions “The life cycle of working 
dir is managed as before local strategy.” IIUC, the working dir will be deleted 
after TaskManager exit. And I think that's enough for current stage, WDYT ?

——

Best regards,

Feifan Wang




At 2024-03-28 12:18:56, "Hangxiang Yu"  wrote:
>Hi, Feifan.
>
>Thanks for your reply.
>
>What if we only use jni to access DFS that needs to reuse Flink FileSystem?
>> And all local disk access through native api. This idea is based on the
>> understanding that jni overhead is not worth mentioning compared to DFS
>> access latency. It might make more sense to consider avoiding jni overhead
>> for faster local disks. Since local disk as secondary is already under
>> consideration [1], maybe we can discuss in that FLIP whether to use native
>> api to access local disk?
>>
>This is a good suggestion. It's reasonable to use native api to access
>local disk cache since it requires lower latency compared to remote access.
>I also believe that the jni overhead is relatively negligible when weighed
>against the latency of remote I/O as mentioned in the FLIP.
>So I think we could just go on proposal 2 and keep proposal 1 as a
>potential future optimization, which could work better when there is a
>higher performance requirement or some native libraries of filesystems have
>significantly higher performance and resource usage compared to their java
>libs.
>
>
>On Thu, Mar 28, 2024 at 11:39 AM Feifan Wang  wrote:
>
>> Thanks for this valuable proposal Hangxiang !
>>
>>
>> > If we need to introduce a JNI call during each filesystem call, that
>> would be N times JNI cost compared with the current RocksDB state-backend's
>> JNI cost.
>> What if we only use jni to access DFS that needs to reuse Flink
>> FileSystem? And all local disk access through native api. This idea is
>> based on the understanding that jni overhead is not worth mentioning
>> compared to DFS access latency. It might make more sense to consider
>> avoiding jni overhead for faster local disks. Since local disk as secondary
>> is already under consideration [1], maybe we can discuss in that FLIP
>> whether to use native api to access local disk?
>>
>>
>> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
>> >Different disaggregated state storages may have their own semantics about
>> >this configuration, e.g. life cycle, supported file systems or storages.
>> I agree with considering moving this configuration up to the engine level
>> until there are other disaggreated backends.
>>
>>
>> [1] https://cwiki.apache.org/confluence/x/U4p3EQ
>>
>> ——
>>
>> Best regards,
>>
>> Feifan Wang
>>
>>
>>
>>
>> At 2024-03-28 09:55:48, "Hangxiang Yu"  wrote:
>> >Hi, Yun.
>> >Thanks for the reply.
>> >
>> >The JNI cost you considered is right. As replied to Yue, I agreed to leave
>> >space and consider proposal 1 as an optimization in the future, which is
>> >also updated in the FLIP.
>> >
>> >The other question is that the configuration of
>> >> `state.backend.forSt.working-dir` looks too coupled with the ForSt
>> >> state-backend, how would it be if we introduce another disaggregated
>> state
>> >> storage? Thus, I think `state.backend.disaggregated.working-dir` might
>> be a
>> >> better configuration name.
>> >
>> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
>> >Different disaggregated state storages may have their own semantics about
>> >this configuration, e.g. life cycle, supported file systems or storages.
>> >Maybe it's more suitable to consider it together when we introduce other
>> >disaggregated state storages in the future.
>> >
>> >On Thu, Mar 28, 2024 at 12:02 AM Yun Tang  wrote:
>> >
>> >> Hi Hangxiang,
>> >>
>> >> The design looks good, and I also support leaving space for proposal 1.
>> >>
>> >> As you know, loading index/filter/data blocks for querying across levels
>> >> would introduce high IO access within the LSM tree for old data. If we
>> need
>> >> to introduce a JNI call during each filesystem call, that would be N
>> times
>> >> JNI cost compared with the current RocksDB state-backend's JNI cost.
>> >>
>> >> The other question is that the configuration o

Re: Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-27 Thread Hangxiang Yu
Hi, Feifan.

Thanks for your reply.

What if we only use jni to access DFS that needs to reuse Flink FileSystem?
> And all local disk access through native api. This idea is based on the
> understanding that jni overhead is not worth mentioning compared to DFS
> access latency. It might make more sense to consider avoiding jni overhead
> for faster local disks. Since local disk as secondary is already under
> consideration [1], maybe we can discuss in that FLIP whether to use native
> api to access local disk?
>
This is a good suggestion. It's reasonable to use native api to access
local disk cache since it requires lower latency compared to remote access.
I also believe that the jni overhead is relatively negligible when weighed
against the latency of remote I/O as mentioned in the FLIP.
So I think we could just go on proposal 2 and keep proposal 1 as a
potential future optimization, which could work better when there is a
higher performance requirement or some native libraries of filesystems have
significantly higher performance and resource usage compared to their java
libs.


On Thu, Mar 28, 2024 at 11:39 AM Feifan Wang  wrote:

> Thanks for this valuable proposal Hangxiang !
>
>
> > If we need to introduce a JNI call during each filesystem call, that
> would be N times JNI cost compared with the current RocksDB state-backend's
> JNI cost.
> What if we only use jni to access DFS that needs to reuse Flink
> FileSystem? And all local disk access through native api. This idea is
> based on the understanding that jni overhead is not worth mentioning
> compared to DFS access latency. It might make more sense to consider
> avoiding jni overhead for faster local disks. Since local disk as secondary
> is already under consideration [1], maybe we can discuss in that FLIP
> whether to use native api to access local disk?
>
>
> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
> >Different disaggregated state storages may have their own semantics about
> >this configuration, e.g. life cycle, supported file systems or storages.
> I agree with considering moving this configuration up to the engine level
> until there are other disaggreated backends.
>
>
> [1] https://cwiki.apache.org/confluence/x/U4p3EQ
>
> ——
>
> Best regards,
>
> Feifan Wang
>
>
>
>
> At 2024-03-28 09:55:48, "Hangxiang Yu"  wrote:
> >Hi, Yun.
> >Thanks for the reply.
> >
> >The JNI cost you considered is right. As replied to Yue, I agreed to leave
> >space and consider proposal 1 as an optimization in the future, which is
> >also updated in the FLIP.
> >
> >The other question is that the configuration of
> >> `state.backend.forSt.working-dir` looks too coupled with the ForSt
> >> state-backend, how would it be if we introduce another disaggregated
> state
> >> storage? Thus, I think `state.backend.disaggregated.working-dir` might
> be a
> >> better configuration name.
> >
> >I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
> >Different disaggregated state storages may have their own semantics about
> >this configuration, e.g. life cycle, supported file systems or storages.
> >Maybe it's more suitable to consider it together when we introduce other
> >disaggregated state storages in the future.
> >
> >On Thu, Mar 28, 2024 at 12:02 AM Yun Tang  wrote:
> >
> >> Hi Hangxiang,
> >>
> >> The design looks good, and I also support leaving space for proposal 1.
> >>
> >> As you know, loading index/filter/data blocks for querying across levels
> >> would introduce high IO access within the LSM tree for old data. If we
> need
> >> to introduce a JNI call during each filesystem call, that would be N
> times
> >> JNI cost compared with the current RocksDB state-backend's JNI cost.
> >>
> >> The other question is that the configuration of
> >> `state.backend.forSt.working-dir` looks too coupled with the ForSt
> >> state-backend, how would it be if we introduce another disaggregated
> state
> >> storage? Thus, I think `state.backend.disaggregated.working-dir` might
> be a
> >> better configuration name.
> >>
> >>
> >> Best
> >> Yun Tang
> >>
> >> 
> >> From: Hangxiang Yu 
> >> Sent: Wednesday, March 20, 2024 11:32
> >> To: dev@flink.apache.org 
> >> Subject: Re: [DISCUSS] FLIP-427: Disaggregated State Store
> >>
> >> Hi, Yue.
> >> Thanks for the reply.
> >>
> >> If we use proposal1, we can easily reuse these optimizati

Re:Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-27 Thread Feifan Wang
Thanks for this valuable proposal Hangxiang !


> If we need to introduce a JNI call during each filesystem call, that would be 
> N times JNI cost compared with the current RocksDB state-backend's JNI cost.
What if we only use jni to access DFS that needs to reuse Flink FileSystem? And 
all local disk access through native api. This idea is based on the 
understanding that jni overhead is not worth mentioning compared to DFS access 
latency. It might make more sense to consider avoiding jni overhead for faster 
local disks. Since local disk as secondary is already under consideration [1], 
maybe we can discuss in that FLIP whether to use native api to access local 
disk?


>I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
>Different disaggregated state storages may have their own semantics about
>this configuration, e.g. life cycle, supported file systems or storages.
I agree with considering moving this configuration up to the engine level until 
there are other disaggreated backends.


[1] https://cwiki.apache.org/confluence/x/U4p3EQ

——

Best regards,

Feifan Wang




At 2024-03-28 09:55:48, "Hangxiang Yu"  wrote:
>Hi, Yun.
>Thanks for the reply.
>
>The JNI cost you considered is right. As replied to Yue, I agreed to leave
>space and consider proposal 1 as an optimization in the future, which is
>also updated in the FLIP.
>
>The other question is that the configuration of
>> `state.backend.forSt.working-dir` looks too coupled with the ForSt
>> state-backend, how would it be if we introduce another disaggregated state
>> storage? Thus, I think `state.backend.disaggregated.working-dir` might be a
>> better configuration name.
>
>I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
>Different disaggregated state storages may have their own semantics about
>this configuration, e.g. life cycle, supported file systems or storages.
>Maybe it's more suitable to consider it together when we introduce other
>disaggregated state storages in the future.
>
>On Thu, Mar 28, 2024 at 12:02 AM Yun Tang  wrote:
>
>> Hi Hangxiang,
>>
>> The design looks good, and I also support leaving space for proposal 1.
>>
>> As you know, loading index/filter/data blocks for querying across levels
>> would introduce high IO access within the LSM tree for old data. If we need
>> to introduce a JNI call during each filesystem call, that would be N times
>> JNI cost compared with the current RocksDB state-backend's JNI cost.
>>
>> The other question is that the configuration of
>> `state.backend.forSt.working-dir` looks too coupled with the ForSt
>> state-backend, how would it be if we introduce another disaggregated state
>> storage? Thus, I think `state.backend.disaggregated.working-dir` might be a
>> better configuration name.
>>
>>
>> Best
>> Yun Tang
>>
>> 
>> From: Hangxiang Yu 
>> Sent: Wednesday, March 20, 2024 11:32
>> To: dev@flink.apache.org 
>> Subject: Re: [DISCUSS] FLIP-427: Disaggregated State Store
>>
>> Hi, Yue.
>> Thanks for the reply.
>>
>> If we use proposal1, we can easily reuse these optimizations .It is even
>> > possible to discuss and review the solution together in the Rocksdb
>> > community.
>>
>> We also saw these useful optimizations which could be applied to ForSt in
>> the future.
>> But IIUC, it's not binding to proposal 1, right? We could also
>> implement interfaces about temperature and secondary cache to reuse them,
>> or organize a more complex HybridEnv based on proposal 2.
>>
>> My point is whether we should retain the potential of proposal 1 in the
>> > design.
>> >
>> This is a good suggestion. We choose proposal 2 firstly due to its
>> maintainability and scalability, especially because it could leverage all
>> filesystems flink supported conveniently.
>> Given the indelible advantage in performance, I think we could also
>> consider proposal 1 as an optimization in the future.
>> For the interface on the DB side, we could also expose more different Envs
>> in the future.
>>
>>
>> On Tue, Mar 19, 2024 at 9:14 PM yue ma  wrote:
>>
>> > Hi Hangxiang,
>> >
>> > Thanks for bringing this discussion.
>> > I have a few questions about the Proposal you mentioned in the FLIP.
>> >
>> > The current conclusion is to use proposal 2, which is okay for me. My
>> point
>> > is whether we should retain the potential of proposal 1 in the design.
>> > There are the following reasons:
>> > 1. No JNI overhead, just like th

Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-27 Thread Hangxiang Yu
Hi, Yun.
Thanks for the reply.

The JNI cost you considered is right. As replied to Yue, I agreed to leave
space and consider proposal 1 as an optimization in the future, which is
also updated in the FLIP.

The other question is that the configuration of
> `state.backend.forSt.working-dir` looks too coupled with the ForSt
> state-backend, how would it be if we introduce another disaggregated state
> storage? Thus, I think `state.backend.disaggregated.working-dir` might be a
> better configuration name.

I'd suggest keeping `state.backend.forSt.working-dir` as it is for now.
Different disaggregated state storages may have their own semantics about
this configuration, e.g. life cycle, supported file systems or storages.
Maybe it's more suitable to consider it together when we introduce other
disaggregated state storages in the future.

On Thu, Mar 28, 2024 at 12:02 AM Yun Tang  wrote:

> Hi Hangxiang,
>
> The design looks good, and I also support leaving space for proposal 1.
>
> As you know, loading index/filter/data blocks for querying across levels
> would introduce high IO access within the LSM tree for old data. If we need
> to introduce a JNI call during each filesystem call, that would be N times
> JNI cost compared with the current RocksDB state-backend's JNI cost.
>
> The other question is that the configuration of
> `state.backend.forSt.working-dir` looks too coupled with the ForSt
> state-backend, how would it be if we introduce another disaggregated state
> storage? Thus, I think `state.backend.disaggregated.working-dir` might be a
> better configuration name.
>
>
> Best
> Yun Tang
>
> 
> From: Hangxiang Yu 
> Sent: Wednesday, March 20, 2024 11:32
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-427: Disaggregated State Store
>
> Hi, Yue.
> Thanks for the reply.
>
> If we use proposal1, we can easily reuse these optimizations .It is even
> > possible to discuss and review the solution together in the Rocksdb
> > community.
>
> We also saw these useful optimizations which could be applied to ForSt in
> the future.
> But IIUC, it's not binding to proposal 1, right? We could also
> implement interfaces about temperature and secondary cache to reuse them,
> or organize a more complex HybridEnv based on proposal 2.
>
> My point is whether we should retain the potential of proposal 1 in the
> > design.
> >
> This is a good suggestion. We choose proposal 2 firstly due to its
> maintainability and scalability, especially because it could leverage all
> filesystems flink supported conveniently.
> Given the indelible advantage in performance, I think we could also
> consider proposal 1 as an optimization in the future.
> For the interface on the DB side, we could also expose more different Envs
> in the future.
>
>
> On Tue, Mar 19, 2024 at 9:14 PM yue ma  wrote:
>
> > Hi Hangxiang,
> >
> > Thanks for bringing this discussion.
> > I have a few questions about the Proposal you mentioned in the FLIP.
> >
> > The current conclusion is to use proposal 2, which is okay for me. My
> point
> > is whether we should retain the potential of proposal 1 in the design.
> > There are the following reasons:
> > 1. No JNI overhead, just like the Performance Part mentioned in Flip
> > 2. RocksDB currently also provides an interface for Env, and there are
> also
> > some implementations, such as HDFS-ENV, which seem to be easily scalable.
> > 3. The RocksDB community continues to support LSM for different storage
> > media, such as  Tiered Storage
> > <
> >
> https://github.com/facebook/rocksdb/wiki/Tiered-Storage-%28Experimental%29
> > >
> >   And some optimizations have been made for this scenario, such as
> Per
> > Key Placement Comparison
> > <https://rocksdb.org/blog/2022/11/09/time-aware-tiered-storage.html>.
> >  *Secondary cache
> > <
> >
> https://github.com/facebook/rocksdb/wiki/SecondaryCache-%28Experimental%29
> > >*,
> > similar to the Hybrid Block Cache mentioned in Flip-423
> >  If we use proposal1, we can easily reuse these optimizations .It is even
> > possible to discuss and review the solution together in the Rocksdb
> > community.
> >  In fact, we have already implemented some production practices using
> > Proposal1 internally. We have integrated HybridEnv, Tiered Storage, and
> > Secondary Cache on RocksDB and optimized the performance of Checkpoint
> and
> > State Restore. It seems work well for us.
> >
> > --
> > Best,
> > Yue
> >
>
>
> --
> Best,
> Hangxiang.
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-27 Thread Yun Tang
Hi Hangxiang,

The design looks good, and I also support leaving space for proposal 1.

As you know, loading index/filter/data blocks for querying across levels would 
introduce high IO access within the LSM tree for old data. If we need to 
introduce a JNI call during each filesystem call, that would be N times JNI 
cost compared with the current RocksDB state-backend's JNI cost.

The other question is that the configuration of 
`state.backend.forSt.working-dir` looks too coupled with the ForSt 
state-backend, how would it be if we introduce another disaggregated state 
storage? Thus, I think `state.backend.disaggregated.working-dir` might be a 
better configuration name.


Best
Yun Tang


From: Hangxiang Yu 
Sent: Wednesday, March 20, 2024 11:32
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-427: Disaggregated State Store

Hi, Yue.
Thanks for the reply.

If we use proposal1, we can easily reuse these optimizations .It is even
> possible to discuss and review the solution together in the Rocksdb
> community.

We also saw these useful optimizations which could be applied to ForSt in
the future.
But IIUC, it's not binding to proposal 1, right? We could also
implement interfaces about temperature and secondary cache to reuse them,
or organize a more complex HybridEnv based on proposal 2.

My point is whether we should retain the potential of proposal 1 in the
> design.
>
This is a good suggestion. We choose proposal 2 firstly due to its
maintainability and scalability, especially because it could leverage all
filesystems flink supported conveniently.
Given the indelible advantage in performance, I think we could also
consider proposal 1 as an optimization in the future.
For the interface on the DB side, we could also expose more different Envs
in the future.


On Tue, Mar 19, 2024 at 9:14 PM yue ma  wrote:

> Hi Hangxiang,
>
> Thanks for bringing this discussion.
> I have a few questions about the Proposal you mentioned in the FLIP.
>
> The current conclusion is to use proposal 2, which is okay for me. My point
> is whether we should retain the potential of proposal 1 in the design.
> There are the following reasons:
> 1. No JNI overhead, just like the Performance Part mentioned in Flip
> 2. RocksDB currently also provides an interface for Env, and there are also
> some implementations, such as HDFS-ENV, which seem to be easily scalable.
> 3. The RocksDB community continues to support LSM for different storage
> media, such as  Tiered Storage
> <
> https://github.com/facebook/rocksdb/wiki/Tiered-Storage-%28Experimental%29
> >
>   And some optimizations have been made for this scenario, such as Per
> Key Placement Comparison
> <https://rocksdb.org/blog/2022/11/09/time-aware-tiered-storage.html>.
>  *Secondary cache
> <
> https://github.com/facebook/rocksdb/wiki/SecondaryCache-%28Experimental%29
> >*,
> similar to the Hybrid Block Cache mentioned in Flip-423
>  If we use proposal1, we can easily reuse these optimizations .It is even
> possible to discuss and review the solution together in the Rocksdb
> community.
>  In fact, we have already implemented some production practices using
> Proposal1 internally. We have integrated HybridEnv, Tiered Storage, and
> Secondary Cache on RocksDB and optimized the performance of Checkpoint and
> State Restore. It seems work well for us.
>
> --
> Best,
> Yue
>


--
Best,
Hangxiang.


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-19 Thread Hangxiang Yu
Hi, Yue.
Thanks for the reply.

If we use proposal1, we can easily reuse these optimizations .It is even
> possible to discuss and review the solution together in the Rocksdb
> community.

We also saw these useful optimizations which could be applied to ForSt in
the future.
But IIUC, it's not binding to proposal 1, right? We could also
implement interfaces about temperature and secondary cache to reuse them,
or organize a more complex HybridEnv based on proposal 2.

My point is whether we should retain the potential of proposal 1 in the
> design.
>
This is a good suggestion. We choose proposal 2 firstly due to its
maintainability and scalability, especially because it could leverage all
filesystems flink supported conveniently.
Given the indelible advantage in performance, I think we could also
consider proposal 1 as an optimization in the future.
For the interface on the DB side, we could also expose more different Envs
in the future.


On Tue, Mar 19, 2024 at 9:14 PM yue ma  wrote:

> Hi Hangxiang,
>
> Thanks for bringing this discussion.
> I have a few questions about the Proposal you mentioned in the FLIP.
>
> The current conclusion is to use proposal 2, which is okay for me. My point
> is whether we should retain the potential of proposal 1 in the design.
> There are the following reasons:
> 1. No JNI overhead, just like the Performance Part mentioned in Flip
> 2. RocksDB currently also provides an interface for Env, and there are also
> some implementations, such as HDFS-ENV, which seem to be easily scalable.
> 3. The RocksDB community continues to support LSM for different storage
> media, such as  Tiered Storage
> <
> https://github.com/facebook/rocksdb/wiki/Tiered-Storage-%28Experimental%29
> >
>   And some optimizations have been made for this scenario, such as Per
> Key Placement Comparison
> .
>  *Secondary cache
> <
> https://github.com/facebook/rocksdb/wiki/SecondaryCache-%28Experimental%29
> >*,
> similar to the Hybrid Block Cache mentioned in Flip-423
>  If we use proposal1, we can easily reuse these optimizations .It is even
> possible to discuss and review the solution together in the Rocksdb
> community.
>  In fact, we have already implemented some production practices using
> Proposal1 internally. We have integrated HybridEnv, Tiered Storage, and
> Secondary Cache on RocksDB and optimized the performance of Checkpoint and
> State Restore. It seems work well for us.
>
> --
> Best,
> Yue
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-19 Thread yue ma
Hi Hangxiang,

Thanks for bringing this discussion.
I have a few questions about the Proposal you mentioned in the FLIP.

The current conclusion is to use proposal 2, which is okay for me. My point
is whether we should retain the potential of proposal 1 in the design.
There are the following reasons:
1. No JNI overhead, just like the Performance Part mentioned in Flip
2. RocksDB currently also provides an interface for Env, and there are also
some implementations, such as HDFS-ENV, which seem to be easily scalable.
3. The RocksDB community continues to support LSM for different storage
media, such as  Tiered Storage

  And some optimizations have been made for this scenario, such as Per
Key Placement Comparison
.
 *Secondary cache
*,
similar to the Hybrid Block Cache mentioned in Flip-423
 If we use proposal1, we can easily reuse these optimizations .It is even
possible to discuss and review the solution together in the Rocksdb
community.
 In fact, we have already implemented some production practices using
Proposal1 internally. We have integrated HybridEnv, Tiered Storage, and
Secondary Cache on RocksDB and optimized the performance of Checkpoint and
State Restore. It seems work well for us.

-- 
Best,
Yue


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-19 Thread Hangxiang Yu
Hi everyone,

Thanks for your valuable feedback!

Our discussions have been going on for a while.
As a sub-FLIP of FLIP-423 which is nearing a consensus, I would like to
start a vote after 72 hours.

Please let me know if you have any concerns, thanks!

On Mon, Mar 11, 2024 at 11:48 AM Hangxiang Yu  wrote:

> Hi, Jeyhun.
>
> Thanks for the reply.
>
> Is this argument true for all workloads? Or does this argument also hold
> for workloads with many small files, which is quite a common case [1] ?
>
> Yes, I think so. The overhead should still be considered negligible,
> particularly in comparison to remote I/O, and other benefits of this
> proposal may be more significant than this one.
>
> Additionally, there is JNI overhead when Flink calls RocksDB methods
> currently. The frequency of these calls could surpass that of actual file
> system interface calls, given that not all state requests require accessing
> the file system.
>
> BTW, the issue with small files can also impact the performance of db with
> the local file system at runtime, so we usually resolve this firstly in the
> production environment.
>
> the engine spawns huge amount of scan range requests to the
> file system to retrieve different parts of a file.
>
> Indeed, frequent requests to the remote file system can significantly
> affect performance. To address this, other FLIPs have introduced various
> strategies:
>
> 1. Local disk cache to minimize remote requests as described in FLIP-423
> which we will introduce in FLIP-429 as you mentioned. With effective cache
> utilization, the performance will not be inferior to the local strategy
> when cache hits.
>
> 2. Grouping remote access to decrease the number of remote I/O requests,
> as proposed in "FLIP-426: Grouping Remote State Access."
>
> 3. Parallel I/O to maximize network bandwidth usage, outlined in
> "FLIP-425: Asynchronous Execution Model."
>
> The PoC implements a simple file cache and asynchronous execution which
> improves the performance a lot. You could also refer to the PoC results in
> FLIP-423.
>
> On Mon, Mar 11, 2024 at 3:11 AM Jeyhun Karimov 
> wrote:
>
>> Hi Hangxiang,
>>
>> Thanks for the proposal. +1 for it.
>> I have a few comments.
>>
>> Proposal 2 has additional JNI overhead, but the overhead is relatively
>> > negligible when weighed against the latency of remote I/O.
>>
>> - Is this argument true for all workloads? Or does this argument also hold
>> for workloads with many small files, which is quite a common case [1] ?
>>
>> - Also, in many workloads the engine does not need the whole file either
>> because of the query forces it or
>> file type supports efficient filtering (e.g. ORC, parquet, arrow files),
>> or
>> simply one file is "divided" among multiple workers.
>> In these cases, the engine spawns huge amount of scan range requests to
>> the
>> file system to retrieve different parts of a file.
>> How the proposed solution would work with these workloads?
>>
>> - The similar question related to the above applies also for caching ( I
>> know caching is subject of FLIP-429, asking here becasue of the related
>> section in this FLIP).
>>
>> Regards,
>> Jeyhun
>>
>> [1] https://blog.min.io/challenge-big-data-small-files/
>>
>>
>>
>> On Thu, Mar 7, 2024 at 10:09 AM Hangxiang Yu  wrote:
>>
>> > Hi devs,
>> >
>> >
>> > I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
>> > State Storage and Management[1], which is a joint work of Yuan Mei,
>> Zakelly
>> > Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
>> >
>> > - FLIP-427: Disaggregated State Store
>> >
>> > This FLIP introduces the initial version of the ForSt disaggregated
>> state
>> > store.
>> >
>> > Please make sure you have read the FLIP-423[1] to know the whole story,
>> and
>> > we'll discuss the details of FLIP-427[2] under this mail. For the
>> > discussion of overall architecture or topics related with multiple
>> > sub-FLIPs, please post in the previous mail[3].
>> >
>> > Looking forward to hearing from you!
>> >
>> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
>> >
>> > [2] https://cwiki.apache.org/confluence/x/T4p3EQ
>> >
>> > [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
>> >
>> >
>> > Best,
>> >
>> > Hangxiang.
>> >
>>
>
>
> --
> Best,
> Hangxiang.
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-10 Thread Hangxiang Yu
Hi, Jeyhun.

Thanks for the reply.

Is this argument true for all workloads? Or does this argument also hold
for workloads with many small files, which is quite a common case [1] ?

Yes, I think so. The overhead should still be considered negligible,
particularly in comparison to remote I/O, and other benefits of this
proposal may be more significant than this one.

Additionally, there is JNI overhead when Flink calls RocksDB methods
currently. The frequency of these calls could surpass that of actual file
system interface calls, given that not all state requests require accessing
the file system.

BTW, the issue with small files can also impact the performance of db with
the local file system at runtime, so we usually resolve this firstly in the
production environment.

the engine spawns huge amount of scan range requests to the
file system to retrieve different parts of a file.

Indeed, frequent requests to the remote file system can significantly
affect performance. To address this, other FLIPs have introduced various
strategies:

1. Local disk cache to minimize remote requests as described in FLIP-423
which we will introduce in FLIP-429 as you mentioned. With effective cache
utilization, the performance will not be inferior to the local strategy
when cache hits.

2. Grouping remote access to decrease the number of remote I/O requests, as
proposed in "FLIP-426: Grouping Remote State Access."

3. Parallel I/O to maximize network bandwidth usage, outlined in "FLIP-425:
Asynchronous Execution Model."

The PoC implements a simple file cache and asynchronous execution which
improves the performance a lot. You could also refer to the PoC results in
FLIP-423.

On Mon, Mar 11, 2024 at 3:11 AM Jeyhun Karimov  wrote:

> Hi Hangxiang,
>
> Thanks for the proposal. +1 for it.
> I have a few comments.
>
> Proposal 2 has additional JNI overhead, but the overhead is relatively
> > negligible when weighed against the latency of remote I/O.
>
> - Is this argument true for all workloads? Or does this argument also hold
> for workloads with many small files, which is quite a common case [1] ?
>
> - Also, in many workloads the engine does not need the whole file either
> because of the query forces it or
> file type supports efficient filtering (e.g. ORC, parquet, arrow files), or
> simply one file is "divided" among multiple workers.
> In these cases, the engine spawns huge amount of scan range requests to the
> file system to retrieve different parts of a file.
> How the proposed solution would work with these workloads?
>
> - The similar question related to the above applies also for caching ( I
> know caching is subject of FLIP-429, asking here becasue of the related
> section in this FLIP).
>
> Regards,
> Jeyhun
>
> [1] https://blog.min.io/challenge-big-data-small-files/
>
>
>
> On Thu, Mar 7, 2024 at 10:09 AM Hangxiang Yu  wrote:
>
> > Hi devs,
> >
> >
> > I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> > State Storage and Management[1], which is a joint work of Yuan Mei,
> Zakelly
> > Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
> >
> > - FLIP-427: Disaggregated State Store
> >
> > This FLIP introduces the initial version of the ForSt disaggregated state
> > store.
> >
> > Please make sure you have read the FLIP-423[1] to know the whole story,
> and
> > we'll discuss the details of FLIP-427[2] under this mail. For the
> > discussion of overall architecture or topics related with multiple
> > sub-FLIPs, please post in the previous mail[3].
> >
> > Looking forward to hearing from you!
> >
> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> >
> > [2] https://cwiki.apache.org/confluence/x/T4p3EQ
> >
> > [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> >
> >
> > Best,
> >
> > Hangxiang.
> >
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] FLIP-427: Disaggregated State Store

2024-03-10 Thread Jeyhun Karimov
Hi Hangxiang,

Thanks for the proposal. +1 for it.
I have a few comments.

Proposal 2 has additional JNI overhead, but the overhead is relatively
> negligible when weighed against the latency of remote I/O.

- Is this argument true for all workloads? Or does this argument also hold
for workloads with many small files, which is quite a common case [1] ?

- Also, in many workloads the engine does not need the whole file either
because of the query forces it or
file type supports efficient filtering (e.g. ORC, parquet, arrow files), or
simply one file is "divided" among multiple workers.
In these cases, the engine spawns huge amount of scan range requests to the
file system to retrieve different parts of a file.
How the proposed solution would work with these workloads?

- The similar question related to the above applies also for caching ( I
know caching is subject of FLIP-429, asking here becasue of the related
section in this FLIP).

Regards,
Jeyhun

[1] https://blog.min.io/challenge-big-data-small-files/



On Thu, Mar 7, 2024 at 10:09 AM Hangxiang Yu  wrote:

> Hi devs,
>
>
> I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
> State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
> Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:
>
> - FLIP-427: Disaggregated State Store
>
> This FLIP introduces the initial version of the ForSt disaggregated state
> store.
>
> Please make sure you have read the FLIP-423[1] to know the whole story, and
> we'll discuss the details of FLIP-427[2] under this mail. For the
> discussion of overall architecture or topics related with multiple
> sub-FLIPs, please post in the previous mail[3].
>
> Looking forward to hearing from you!
>
> [1] https://cwiki.apache.org/confluence/x/R4p3EQ
>
> [2] https://cwiki.apache.org/confluence/x/T4p3EQ
>
> [3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
>
>
> Best,
>
> Hangxiang.
>


[DISCUSS] FLIP-427: Disaggregated State Store

2024-03-07 Thread Hangxiang Yu
Hi devs,


I'd like to start a discussion on a sub-FLIP of FLIP-423: Disaggregated
State Storage and Management[1], which is a joint work of Yuan Mei, Zakelly
Lan, Jinzhong Li, Hangxiang Yu, Yanfei Lei and Feng Wang:

- FLIP-427: Disaggregated State Store

This FLIP introduces the initial version of the ForSt disaggregated state
store.

Please make sure you have read the FLIP-423[1] to know the whole story, and
we'll discuss the details of FLIP-427[2] under this mail. For the
discussion of overall architecture or topics related with multiple
sub-FLIPs, please post in the previous mail[3].

Looking forward to hearing from you!

[1] https://cwiki.apache.org/confluence/x/R4p3EQ

[2] https://cwiki.apache.org/confluence/x/T4p3EQ

[3] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0


Best,

Hangxiang.