vinothchandar commented on code in PR #17610:
URL: https://github.com/apache/hudi/pull/17610#discussion_r2684656506


##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;

Review Comment:
   ```suggestion
   - a cache of RLI would be introduced to speed the access; along with a 
caching invalidation mechanism to keep it consistent with committed state of 
the table.
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;

Review Comment:
   ```suggestion
   - a separate Flink index function to write the RLI/SI payloads;
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation

Review Comment:
   ```suggestion
   ## High Level Design
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.

Review Comment:
   10ms gives us 100 records/sec throughput per shard.. What do we really mean 
by this?  lets avoid numbers like this, unless we are really sure. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,116 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI support for Flink streaming
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly set limits for the kind of write throughput supported by RLI (based 
on certain average response time for the RLI access, like from x0ms to x00ms) 
via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.
+
+We also need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.

Review Comment:
   can you please explain this problem in more detail than 1 sentence.. and 
even more in comments.. I am not following what the issue is.. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.

Review Comment:
   I would love for us to spell out actual issues like this, such that even a 
new reader can understand. @danny0405 Please make a pass over the entire RFC 
like this, before this comment is resolved. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;

Review Comment:
   ```suggestion
   - a RLI based index backend will be added, which can be used in place of the 
current the flink_state index;
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,116 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI support for Flink streaming
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly set limits for the kind of write throughput supported by RLI (based 
on certain average response time for the RLI access, like from x0ms to x00ms) 
via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.

Review Comment:
   lets do things consistent with the Spark writer.. and we should plan on 
supporting both partitioned and global RLI. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.
+
+We also need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.
+This cache can be cleaned once the checkpoint/instant is committed to 
Hudi(indicates that the index payloads are also committed).
+
+On job restart or task failover, there is use case that the checkpoint 
succeeds on Flink while the instant is not committed to Hudi, for DT metadata, 
the pipeline will recommit

Review Comment:
   pull into a different section.



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);

Review Comment:
   @danny0405 please fix the entire RFC, such that you don't loosely refer to 
RLI, when you really mean RLI and SI. it gets very confusing to read. Please be 
precise. lets keep this comment open until its addressed across the board.
   



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write

Review Comment:
   ```suggestion
   ### Detailed Design
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.

Review Comment:
   ```suggestion
   Another motivation is for scalable, efficient support for cross-partition 
updates (where the partition path of the record is changed). Currently, the 
only choice is flink_state index, which can be costly when used in such a 
scenario to hold state proportional to the size of table. This is due to the 
fact that the flink_state  could use a lot of memory and can not be shared 
between different workloads.
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.

Review Comment:
   Let's specify if we are caching the `key -> location` mappings (record level 
cache). or parts of the RLI Share (hfile, log file)..  (file level cache).. 
There are pros/cons to each approach. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,116 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI support for Flink streaming
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly set limits for the kind of write throughput supported by RLI (based 
on certain average response time for the RLI access, like from x0ms to x00ms) 
via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.
+
+We also need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.
+This cache can be cleaned once the checkpoint/instant is committed to Hudi.
+
+On job restart or task failover, there is use case that the checkpoint 
succeeds on Flink while the instant is not committed to Hudi, for DT metadata, 
the pipeline will recommit
+the instant with the recovered table metadata, while for RLI access, we need 
to include these special inflight instants on queries, basically, we need to 
support reading inflight instants on MDT.
+
+The query will access the in-memory cache first then the MDT RLI index:
+
+![The RLI Access Pattern](./rli-access-pattern.png)
+
+### The Shuffle of RLI Payloads
+In `StreamWrite` operator, the index items are inferred and sent to 
`IndexWrite` operator in streaming style, the index records are shuffled by 
`hash(record_key) % num_rli_shards`(the same hashing algorithm of the MDT `RLI 
index` partitioner),

Review Comment:
   I want to reuse as much common configs as possible. strictly no flink 
specific table conigs. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation

Review Comment:
   include diagrams?



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);

Review Comment:
   ```suggestion
   - the MDT RLI and SI files will be written synchronously with the data table 
data files, the metadata is sent to the coordinator for a final commit to the 
MDT(after `FILES` partition is ready);
   ```



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.
+
+We also need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.
+This cache can be cleaned once the checkpoint/instant is committed to 
Hudi(indicates that the index payloads are also committed).

Review Comment:
   1 line for cache invalidation is really insufficient :). This design 
basically says - we are reading the RLI from shard all over again each commit? 
   
   Can't we do a file system level cache where we can just get the base file or 
log file downloaded on-demand.. reducing the amount of cache read down to 
executors in each commit?



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,116 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI support for Flink streaming
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly set limits for the kind of write throughput supported by RLI (based 
on certain average response time for the RLI access, like from x0ms to x00ms) 
via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.

Review Comment:
   > should we shuffle different keys under the same file group of RLI in 
metadata table into the same sub-task
   
   We should be reading reach RLI shard once per commit . Even with caching, 
its a terrible idea to not group keys to task for RLI lookup, since it ll 
affect cache efficiency. i.e each node has to then cache all RLI shards.. which 
can be massive. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,135 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: RLI and SI support for Flink sink
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+This RFC aims to introduce RLI and SI support for Flink streaming:
+
+- Impl reliable and performant write and read support for RLI via Flink APIs;
+- The RLI impl is engines compatible, for e.g, Flink can access and utilize 
the RLI written by Spark and vice versa;
+- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables;
+- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job;
+- Smart caching of RLI;
+- Clearly document scale/performance limits for write throughput supported by 
RLI (based on certain average response time for the RLI access, like from x0ms 
to x00ms) via empirical benchmarks;
+- Ability to be expanded to arbitrary secondary indexing on different columns.
+
+## Background
+Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket or flink_state , this caused a overhead 
for users in production.
+
+Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state actually costs a lot of memory and can 
not be shared between different workloads.
+
+## Implementation
+
+The high-level ideas:
+
+- a RLI based index backend will be there to replace the flink_state index;
+- a cache of RLI would be introduced to speed the access;
+- a separate index function to write the RLI/SI payloads;
+- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready);
+- the MDT compaction is switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.
+
+### The Write
+
+### The RLI Access
+In `BucketAssigner` operator, the RLI index metadata would be utilized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete.
+In other words, the RLI index metadata will serve as the same role of the 
`flink_state` index.
+
+#### The Cache of RLI Access
+We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.
+
+We also need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.
+This cache can be cleaned once the checkpoint/instant is committed to 
Hudi(indicates that the index payloads are also committed).
+
+On job restart or task failover, there is use case that the checkpoint 
succeeds on Flink while the instant is not committed to Hudi, for DT metadata, 
the pipeline will recommit
+the instant with the recovered table metadata, because the `BucketAssigner` 
operator is the upstream operator of `StreamWrite` operator, there is time gap 
for these inflight instants to recommit,
+and we do not want to block the processing of `BucketAssigner`(to wait for the 
inflight instants to recommit successfully). The suggested solution is
+to include these special inflight instants on RLI access queries, basically, 
we need to support reading inflight instants on MDT.
+These inflight instants are the ones whose corresponding checkpoint has 
succeeded, inflight instants without successful checkpoint are not included.
+See more details in the appendix for Job/Task failover.
+
+The query will access the in-memory cache first then the MDT RLI index:
+
+![The RLI Access Pattern](./rli-access-pattern.png)
+
+### The Shuffle of RLI/SI Payloads
+In `StreamWrite` operator, the index items are inferred and sent to 
`IndexWrite` operator in streaming style, the index records are shuffled by 
`hash(record_key) % num_rli_shards`(the same hashing algorithm of the MDT `RLI 
index` partitioner),
+this is critical to avoid `N*M` files to write to MDT partition(`N` is the RLI 
partition buckets number, `M` is the data table buckets involved in the current 
write).
+
+How do we ensure the data record and index record always belong to one 
commit/checkpoint: the barrier is flowing together with the records in Flink, 
see 
[how-does-state-snapshotting-work](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work),
+when the `StreamWrite` operator received a record, it emits its corresponding 
index record in one `#processElement` call, so we can always keep the bindings 
of these two, in other words, no barrier would be amidst of the two.

Review Comment:
   add example from GH discussion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to