Hi Zakelly, Soumitra, Thanks for the proposal and the discussion! I have several questions:
1. Is it possible that reads will become more costly in some scenarios? Per my understanding, on compaction RocksDB would need to read and write back the same state (compared to the current implementation) So the benefit is deferring and "batching" those read/write operations. Current approach add: read + deserialize + aggregate + serialize + write (IO#1) get: read + deserialize (IO#2) Proposed approach add: serialize + write (IO#1) compact: read + deserialize + aggregate + serialize + write (IO#2) get: read + deserialize (IO#3) But if adds are mostly followed by gets then there's more IO because RocksDB needs to make an additional call via JNI before serving reads, right? If that's the case, we need to have two implementations and select one depending on the case? 2. Speaking more generally, could you list the motivating use cases, e.g. existing Flink DataStream Operators, custom operators, SQL operators? If the impact depends on the operator, it would be helpful to understand how the existing operators would be affected. 3. Could you clarify how the Reduce/Aggregate function calls will be dispatched? In theory, if an operator has two such states, their states will end up in the same RocksDB instance and we need to distinguish them, right? 4. Function dispatch during restore IIRC, aggregate functions might not be available during restore operation - because state descriptors containing these functions are set later. Is this a concern? Do we need to disable compactions during restore? 5. Remote compactions In the light of Disaggregated State Backend, Flink might benefit from remote compactions. This may help with predictability (no compaction impact on processing) and achieving better cost/utilization. AFAIK ForSt doesn't support it ATM, but it would be great to keep this possibility open. And I think the current proposal will make remote compactions problematic. 6. Alternative: ListState Have you considered using ListState to accumulate the records; and then reduce/aggregate on read (or incrementally on write)? Basically, doing the same thing as you proposed but purely in Java. I think it should address all the concerns I mentioned above; plus - removes the associative requirement (because we control the order) - removes RocksDB dependency - gives some flexibility when to aggregate (incrementally on write and/or on read) (I couldn't find this in the Rejected Alternatives, sorry if I missed that). Regards, Roman On Tue, May 26, 2026 at 3:50 PM Zakelly Lan <[email protected]> wrote: > Hi Soumitra, > > I'm somehow getting confused. I was expecting there is no public API > change in Flink side, and users could use the original state descriptors to > create ReducingState/AggregatingState. And in the implementation side, > taking RocksDBReducingState as example, a wrapper of user defined > ReducingFunction that extends the AbstractAssociativeMergeOperator is > provided to the frocksdb side. This may be a simple way of optimization, > offering no additional new use cases. I may have missed some information, > so please correct me if I'm wrong. > > > And BTW, CC to @roman <[email protected]> and @Yuan Mei > <[email protected]> as you may be interested in this topic. > > > Best, > Zakelly > > On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar <[email protected]> > wrote: > >> Hi Zakelly, >> >> I have removed the ReducingMergeState.set method to minimize the changes >> to >> public APIs. The modified FLIP is at the same - >> >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> >> Community members, >> This proposal is a great addition to FrocksDB/Flink to enable applications >> to harness the power of RocksDB. This addition will help write-heavy >> workloads and let users build associative data structures in a streaming >> fashion. >> >> Please review, and I can answer any question. >> >> Best, >> -Soumitra. >> >> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <[email protected]> >> wrote: >> >> > Hi Zakelly, >> > >> > Thanks for your review and +1! >> > >> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]> >> wrote: >> > >> >> Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing >> >> and >> >> aggregating state. It's important that we could leverage Rocksdb's >> merge >> >> operators to eliminate unnecessary `get()`. However I have a few >> >> questions: >> >> >> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in >> >> frocksdb side, so how could user pass this instance to the RocksDB >> State >> >> backend and what is the relationship with the Flink's `ReduceFunction` >> or >> >> `AggregateFunction`. I would suggest we may 'translate' user's >> >> `ReduceFunction` into some frocksdb's merge operator, thus for flink >> we >> >> still maintain the original experience. WDYT? >> >> >> > >> > Yes, that is the whole idea. Since ReduceFunction and AggregateFunction >> > are existing primitives and they are associative and serializable, I am >> > building on them. >> > >> > This is how they are wired. There are two new classes >> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator >> implementing >> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is >> configured >> > with one of them. RocksDBReducingMergeOperator for ReducingMergeState, >> and >> > RocksDBAggregatingMergeOperator for AggregatingMergeState types. These >> > classes get the callback from frocksdb, handle the serde, and call the >> user >> > defined ReduceFunction and AggregateFunction. >> > >> > >> >> >> >> 2. I read you introduce a new `ReducingMergeState` with a new `set()` >> >> method compared with `ReducingState`, is this necessary? I mean if we >> >> intend to optimize performance solely through the use of a merge >> operator, >> >> this is not necessary, right? I do not recommend introducing too many >> >> public APIs, as this would force us to consider their >> >> semantics. Specifically for example, in changelog stream processing, if >> >> the >> >> `merge` operation were to permit state-setting operations, it would >> >> complicate potential future retraction (or reverse merge/aggregating) >> >> operations. WDYT? >> >> >> > >> > ReducingMergeState.set is just a shortcut for clear-add. I agree with >> your >> > point, I will remove it. >> > >> > Best, >> > -Soumitra. >> > >> > >> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar < >> [email protected]> >> >> wrote: >> >> >> >> > Dear Community Members, >> >> > >> >> > I want to start discussion on the two tickets I filed recently: >> >> > Add support for Java based AssociativeMergeOperator via JNI >> >> > <https://issues.apache.org/jira/browse/FLINK-39455> >> >> > Support ReducingMergeState and AggregatingMergeState backed by Java >> >> based >> >> > associative merge operators >> >> > <https://issues.apache.org/jira/browse/FLINK-39456> >> >> > >> >> > Copying the motivation from the FLIP doc >> >> > < >> >> > >> >> >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> >> > > >> >> > : >> >> > >> >> > Flink supports RocksDBReducingState and RocksDBAggregatingState state >> >> > variables that do a synchronous read-modify-write on every add call. >> >> While >> >> > this works great in many scenarios, for write-heavy workloads this >> can >> >> be >> >> > expensive and may become a bottleneck. >> >> > RocksDB's AssociativeMergeOperator is a storage-level primitive >> designed >> >> > for commutative and associative operations — integer counters, set >> >> union, >> >> > list append, approximate sketches, top-K structures, Bloom filter, >> and >> >> > similar patterns. However, frocksdb (the RocksDB fork used in Flink) >> >> does >> >> > not support Java based associative merge operators. >> >> > >> >> > This FLIP has two parts: >> >> > 1. Support for Java based AssociativeMergeOperator in frocksdb via >> JNI >> >> > 2. Support ReducingMergeState and AggregatingMergeState backed by >> Java >> >> > based associative merge operators >> >> > >> >> > The first part proposes exposing the associative merge operator as a >> >> Java >> >> > class in frocksdb with minimal JNI overhead. RocksDB can call these >> >> > operators during flushing and compaction. >> >> > The second part leverages the frocksdb support developed in the first >> >> part >> >> > to support ReducingMergeState and AggregatingMergeState state >> variables >> >> > with user defined ReduceFunction and AggregateFunction using rocksdb >> >> > backend. >> >> > >> >> > This enhancement opens up a powerful feature of rocksdb to Java. >> Flink >> >> > users can use it to build interesting associative data structures >> >> > on streaming data. I have added benchmark details from a prototype >> >> > implementation in the FLIP doc. >> >> > >> >> > Looking forward to feedback. >> >> > >> >> > FLIP in Google doc >> >> > < >> >> > >> >> >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> >> > > >> >> > >> >> > Best, >> >> > -Soumitra. >> >> > >> >> >> > >> >
