Hi Soumitra, I'm somehow getting confused. I was expecting there is no public API change in Flink side, and users could use the original state descriptors to create ReducingState/AggregatingState. And in the implementation side, taking RocksDBReducingState as example, a wrapper of user defined ReducingFunction that extends the AbstractAssociativeMergeOperator is provided to the frocksdb side. This may be a simple way of optimization, offering no additional new use cases. I may have missed some information, so please correct me if I'm wrong.
And BTW, CC to @roman <[email protected]> and @Yuan Mei <[email protected]> as you may be interested in this topic. Best, Zakelly On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar <[email protected]> wrote: > Hi Zakelly, > > I have removed the ReducingMergeState.set method to minimize the changes to > public APIs. The modified FLIP is at the same - > > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing > > Community members, > This proposal is a great addition to FrocksDB/Flink to enable applications > to harness the power of RocksDB. This addition will help write-heavy > workloads and let users build associative data structures in a streaming > fashion. > > Please review, and I can answer any question. > > Best, > -Soumitra. > > On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <[email protected]> > wrote: > > > Hi Zakelly, > > > > Thanks for your review and +1! > > > > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]> > wrote: > > > >> Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing > >> and > >> aggregating state. It's important that we could leverage Rocksdb's merge > >> operators to eliminate unnecessary `get()`. However I have a few > >> questions: > >> > >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in > >> frocksdb side, so how could user pass this instance to the RocksDB State > >> backend and what is the relationship with the Flink's `ReduceFunction` > or > >> `AggregateFunction`. I would suggest we may 'translate' user's > >> `ReduceFunction` into some frocksdb's merge operator, thus for flink we > >> still maintain the original experience. WDYT? > >> > > > > Yes, that is the whole idea. Since ReduceFunction and AggregateFunction > > are existing primitives and they are associative and serializable, I am > > building on them. > > > > This is how they are wired. There are two new classes > > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator > implementing > > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is configured > > with one of them. RocksDBReducingMergeOperator for ReducingMergeState, > and > > RocksDBAggregatingMergeOperator for AggregatingMergeState types. These > > classes get the callback from frocksdb, handle the serde, and call the > user > > defined ReduceFunction and AggregateFunction. > > > > > >> > >> 2. I read you introduce a new `ReducingMergeState` with a new `set()` > >> method compared with `ReducingState`, is this necessary? I mean if we > >> intend to optimize performance solely through the use of a merge > operator, > >> this is not necessary, right? I do not recommend introducing too many > >> public APIs, as this would force us to consider their > >> semantics. Specifically for example, in changelog stream processing, if > >> the > >> `merge` operation were to permit state-setting operations, it would > >> complicate potential future retraction (or reverse merge/aggregating) > >> operations. WDYT? > >> > > > > ReducingMergeState.set is just a shortcut for clear-add. I agree with > your > > point, I will remove it. > > > > Best, > > -Soumitra. > > > > > >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar < > [email protected]> > >> wrote: > >> > >> > Dear Community Members, > >> > > >> > I want to start discussion on the two tickets I filed recently: > >> > Add support for Java based AssociativeMergeOperator via JNI > >> > <https://issues.apache.org/jira/browse/FLINK-39455> > >> > Support ReducingMergeState and AggregatingMergeState backed by Java > >> based > >> > associative merge operators > >> > <https://issues.apache.org/jira/browse/FLINK-39456> > >> > > >> > Copying the motivation from the FLIP doc > >> > < > >> > > >> > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing > >> > > > >> > : > >> > > >> > Flink supports RocksDBReducingState and RocksDBAggregatingState state > >> > variables that do a synchronous read-modify-write on every add call. > >> While > >> > this works great in many scenarios, for write-heavy workloads this can > >> be > >> > expensive and may become a bottleneck. > >> > RocksDB's AssociativeMergeOperator is a storage-level primitive > designed > >> > for commutative and associative operations — integer counters, set > >> union, > >> > list append, approximate sketches, top-K structures, Bloom filter, and > >> > similar patterns. However, frocksdb (the RocksDB fork used in Flink) > >> does > >> > not support Java based associative merge operators. > >> > > >> > This FLIP has two parts: > >> > 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI > >> > 2. Support ReducingMergeState and AggregatingMergeState backed by Java > >> > based associative merge operators > >> > > >> > The first part proposes exposing the associative merge operator as a > >> Java > >> > class in frocksdb with minimal JNI overhead. RocksDB can call these > >> > operators during flushing and compaction. > >> > The second part leverages the frocksdb support developed in the first > >> part > >> > to support ReducingMergeState and AggregatingMergeState state > variables > >> > with user defined ReduceFunction and AggregateFunction using rocksdb > >> > backend. > >> > > >> > This enhancement opens up a powerful feature of rocksdb to Java. Flink > >> > users can use it to build interesting associative data structures > >> > on streaming data. I have added benchmark details from a prototype > >> > implementation in the FLIP doc. > >> > > >> > Looking forward to feedback. > >> > > >> > FLIP in Google doc > >> > < > >> > > >> > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing > >> > > > >> > > >> > Best, > >> > -Soumitra. > >> > > >> > > >
