Hi Yang,

Thanks for the great proposal!


I'm particularly interested in the exactly-once guarantee mechanism in the 
proposal. If I were to design the solution, the first idea that comes to my 
mind is to leverage the two phase commit transaction protocol in RocksDB [1]. I 
think this method should simplify the recovery process. To further avoid lost 
updates, we can utilize the pessimistic transaction mode [2] to 
automatically acquire a lock. However, the primary challenge of the above 
approach I think lies in reconciling data visibility with Fluss's emphasis on 
real-time processing. What do you think?


[1] https://github.com/facebook/rocksdb/wiki/Two-Phase-Commit-Implementation
[2] https://github.com/facebook/rocksdb/wiki/Transactions



Regards,
Cheng Wang



 




------------------ Original ------------------
From:                                                                           
                                             "dev"                              
                                                      
<[email protected]&gt;;
Date:&nbsp;Fri, Dec 5, 2025 10:45 AM
To:&nbsp;"dev"<[email protected]&gt;;

Subject:&nbsp;[DISCUSS] FIP-21: Aggregation Merge Engine



Hi all,

Aggregation computation is one of the most common requirements in real-time
data processing, widely used in scenarios like real-time OLAP, reporting,
and monitoring. However, the current approach of performing aggregation at
the Flink compute layer faces significant state management challenges that
limit system scalability and performance.

As the number of unique keys grows, Flink's aggregation state expands
linearly, leading to several critical issues:
- State bloat consuming massive memory and disk resources
- Long checkpoint durations affecting RTO and causing backpressure
- Expensive state redistribution making scaling difficult
- Limited data scale a single job can handle

By pushing aggregation down to the storage engine layer, we can
fundamentally address these issues. Aggregation state would migrate from
Flink State Backend to Fluss storage, making Flink jobs nearly stateless.
This approach leverages Fluss's LSM structure for efficient storage,
enables low-latency primary key queries on aggregation results, and
significantly reduces resource consumption.

Additionally, Apache Paimon has already implemented a comprehensive
Aggregation Merge Engine. As Flink's streaming storage layer collaborating
with Paimon in the stream-lake integration architecture, Fluss needs to
align this core capability to ensure users can seamlessly switch between
stream tables and lake tables.

To implement this, I'd like to propose FIP-21: Aggregation Merge Engine [1].
<https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine&gt;

This proposal introduces:
- An extensible aggregation framework supporting 11 common functions (sum,
max, min, product, listagg, bool_and, bool_or, etc.) with field-level
configuration
- Exactly-once semantics through changelog-based undo recovery mechanism
- Column lock coordination enabling multi-job concurrent writes without
conflicts

Any feedback and suggestions are welcome!

[1]:
https://cwiki.apache.org/confluence/display/FLUSS/FIP-21%3A+Aggregation+Merge+Engine

Best regards,
Yang

Reply via email to