Hey Soumitra, thanks for the proposal, the direction is a great exploration!
I have the following questions: 1. *Motivation Use Case:* We need a more specific SQL/DataStream use case from a user's perspective to motivate this change. This would be a great amendment to the FLIP. 2. This is essentially a DB operation and optimization. Can it be implemented without changing Flink's Public Interfaces? 3. Most importantly, how will this land end-to-end? This involves rewriting SQL and existing DS (DataStream) operators. How will it be used from an end-to-end perspective? Best Yuan On Mon, Jun 1, 2026 at 12:44 PM Soumitra Kumar <[email protected]> wrote: > Hi Roman, > > Thanks for your thorough review, please find the comments below. > > On Sun, May 31, 2026 at 2:42 PM Roman Khachatryan <[email protected]> > wrote: > >> 1. Is it possible that reads will become more costly in some scenarios? >> >> Per my understanding, on compaction RocksDB would need to read and write >> back the same state (compared to the current implementation) >> So the benefit is deferring and "batching" those read/write operations. >> >> Current approach >> add: read + deserialize + aggregate + serialize + write (IO#1) >> get: read + deserialize (IO#2) >> >> Proposed approach >> add: serialize + write (IO#1) >> compact: read + deserialize + aggregate + serialize + write (IO#2) >> get: read + deserialize (IO#3) >> >> But if adds are mostly followed by gets then there's more IO because >> RocksDB needs to make an additional call via JNI before serving reads, >> right? >> If that's the case, we need to have two implementations and select one >> depending on the case? >> > > Your calculation of the cost of the proposed approach is correct. If there > is frequent read-after-write, then the merge operator does not add value. > Users should use the existing read-modify-write based state variables > instead. The proposal does not change the behavior of any of the existing > state variables. Users can decide when to use the new state variables that > support custom Reduce/Aggregate functions. > > >> 2. Speaking more generally, could you list the motivating use cases, e.g. >> existing Flink DataStream Operators, custom operators, SQL operators? >> If the impact depends on the operator, it would be helpful to understand >> how the existing operators would be affected. >> > > I got the idea of this enhancement based on a real use case at my work. I > have captured the scenarios in THIS > <https://github.com/soumitrak/flink_streaming_event_reordering> repo. > Essentially, the scenario is to reorder events based on event time, and > process the ordered events every minute. In this case, I am building a > sorted list using an associative operator that implements merging two > sorted lists. > > In general, the proposal should benefit write-heavy (less read) scenarios > where the computation can be a Reduce/Aggregate function. The existing > state variables do read-modify-write and become a bottleneck for ingestion > for write-heavy use cases. Check the performance using the prototype > implementation HERE > <https://github.com/soumitrak/flink_streaming_event_reordering#performance-comparison> > . > > >> 3. Could you clarify how the Reduce/Aggregate function calls will be >> dispatched? >> In theory, if an operator has two such states, their states will end up in >> the same RocksDB instance and we need to distinguish them, right? > > > RocksDB allows one merge operator per ColumnFamily, and AFAIU, Flink > creates one ColumnFamily per state variable. Currently, Flink sets the > "stringappendtest" > <https://github.com/apache/flink/blob/6c756a084d40f4716cdfdb4fdd71b0c919c448e6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java#L133> > merge operator for all ColumnFamilies. In this proposal, Flink can set a > user provided Java based Reduce/Aggregate function as the merge operator. > In your example, if the operator has two state variables, then there will > be two ColumnFamilies, and Flink can set a custom ReduceFunction for one > variable and a custom AggregateFunction for the other variable. We don't > need to change anything at this layer in Flink/RocksDB. > > >> 4. Function dispatch during restore >> IIRC, aggregate functions might not be available during restore operation >> - >> because state descriptors containing these functions are set later. >> Is this a concern? Do we need to disable compactions during restore? >> > > Great point! The Reduce/Aggregate functions in Flink are already > serializable and in the proposal they are serialized in the savepoint and > are restored when rocksdb is loaded. This allows rocksdb to call these > functions during compaction before state descriptors are called. This way > we don't need to disable compaction during restore. > > >> 5. Remote compactions >> In the light of Disaggregated State Backend, Flink might benefit from >> remote compactions. >> This may help with predictability (no compaction impact on processing) and >> achieving better cost/utilization. >> AFAIK ForSt doesn't support it ATM, but it would be great to keep this >> possibility open. >> And I think the current proposal will make remote compactions problematic. >> > > TBH, I don't understand ForSt in detail to comment on this item. Since the > proposal is exposing associative merge operators, it should not be an issue > to support in ForSt. In fact, if ForSt does not support associative merge > operators, then I will volunteer to add it, but let's get this proposal > first. > > >> 6. Alternative: ListState >> Have you considered using ListState to accumulate the records; and then >> reduce/aggregate on read (or incrementally on write)? >> Basically, doing the same thing as you proposed but purely in Java. >> >> I think it should address all the concerns I mentioned above; plus >> - removes the associative requirement (because we control the order) >> - removes RocksDB dependency >> - gives some flexibility when to aggregate (incrementally on write and/or >> on read) >> >> (I couldn't find this in the Rejected Alternatives, sorry if I missed >> that). >> > > ListState uses the "stringappendtest" merge operator, defined in the C++ > layer in FrocksDB, that concatenates strings using comma char. I can > implement the sorted list using this construct, but the read will be more > expensive than the proposal, since the sorting will happen during the read. > The proposal adds support for pure Java based associative merge operators > that will be invoked by RocksDB using JNI. There will be JNI overhead, but > it is not on the write thread. The write thread is as performant as the > ListState implementation. I have a prototype implementation that reduces > the JNI overhead. I think this can be a great addition to Flink and, just > like other advanced features, it requires understanding/tuning of rocksdb. > > Best, > -Soumitra. > > >> >> On Tue, May 26, 2026 at 3:50 PM Zakelly Lan <[email protected]> >> wrote: >> >> > Hi Soumitra, >> > >> > I'm somehow getting confused. I was expecting there is no public API >> > change in Flink side, and users could use the original state >> descriptors to >> > create ReducingState/AggregatingState. And in the implementation side, >> > taking RocksDBReducingState as example, a wrapper of user defined >> > ReducingFunction that extends the AbstractAssociativeMergeOperator is >> > provided to the frocksdb side. This may be a simple way of optimization, >> > offering no additional new use cases. I may have missed some >> information, >> > so please correct me if I'm wrong. >> > >> > >> > And BTW, CC to @roman <[email protected]> and @Yuan Mei >> > <[email protected]> as you may be interested in this topic. >> > >> > >> > Best, >> > Zakelly >> > >> > On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar < >> [email protected]> >> > wrote: >> > >> >> Hi Zakelly, >> >> >> >> I have removed the ReducingMergeState.set method to minimize the >> changes >> >> to >> >> public APIs. The modified FLIP is at the same - >> >> >> >> >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> >> >> >> Community members, >> >> This proposal is a great addition to FrocksDB/Flink to enable >> applications >> >> to harness the power of RocksDB. This addition will help write-heavy >> >> workloads and let users build associative data structures in a >> streaming >> >> fashion. >> >> >> >> Please review, and I can answer any question. >> >> >> >> Best, >> >> -Soumitra. >> >> >> >> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar < >> [email protected]> >> >> wrote: >> >> >> >> > Hi Zakelly, >> >> > >> >> > Thanks for your review and +1! >> >> > >> >> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]> >> >> wrote: >> >> > >> >> >> Thanks for the FLIP! I'd +1 for the direction of enhancing the >> reducing >> >> >> and >> >> >> aggregating state. It's important that we could leverage Rocksdb's >> >> merge >> >> >> operators to eliminate unnecessary `get()`. However I have a few >> >> >> questions: >> >> >> >> >> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in >> >> >> frocksdb side, so how could user pass this instance to the RocksDB >> >> State >> >> >> backend and what is the relationship with the Flink's >> `ReduceFunction` >> >> or >> >> >> `AggregateFunction`. I would suggest we may 'translate' user's >> >> >> `ReduceFunction` into some frocksdb's merge operator, thus for >> flink >> >> we >> >> >> still maintain the original experience. WDYT? >> >> >> >> >> > >> >> > Yes, that is the whole idea. Since ReduceFunction and >> AggregateFunction >> >> > are existing primitives and they are associative and serializable, I >> am >> >> > building on them. >> >> > >> >> > This is how they are wired. There are two new classes >> >> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator >> >> implementing >> >> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is >> >> configured >> >> > with one of them. RocksDBReducingMergeOperator for >> ReducingMergeState, >> >> and >> >> > RocksDBAggregatingMergeOperator for AggregatingMergeState types. >> These >> >> > classes get the callback from frocksdb, handle the serde, and call >> the >> >> user >> >> > defined ReduceFunction and AggregateFunction. >> >> > >> >> > >> >> >> >> >> >> 2. I read you introduce a new `ReducingMergeState` with a new >> `set()` >> >> >> method compared with `ReducingState`, is this necessary? I mean if >> we >> >> >> intend to optimize performance solely through the use of a merge >> >> operator, >> >> >> this is not necessary, right? I do not recommend introducing too >> many >> >> >> public APIs, as this would force us to consider their >> >> >> semantics. Specifically for example, in changelog stream >> processing, if >> >> >> the >> >> >> `merge` operation were to permit state-setting operations, it would >> >> >> complicate potential future retraction (or reverse >> merge/aggregating) >> >> >> operations. WDYT? >> >> >> >> >> > >> >> > ReducingMergeState.set is just a shortcut for clear-add. I agree with >> >> your >> >> > point, I will remove it. >> >> > >> >> > Best, >> >> > -Soumitra. >> >> > >> >> > >> >> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar < >> >> [email protected]> >> >> >> wrote: >> >> >> >> >> >> > Dear Community Members, >> >> >> > >> >> >> > I want to start discussion on the two tickets I filed recently: >> >> >> > Add support for Java based AssociativeMergeOperator via JNI >> >> >> > <https://issues.apache.org/jira/browse/FLINK-39455> >> >> >> > Support ReducingMergeState and AggregatingMergeState backed by >> Java >> >> >> based >> >> >> > associative merge operators >> >> >> > <https://issues.apache.org/jira/browse/FLINK-39456> >> >> >> > >> >> >> > Copying the motivation from the FLIP doc >> >> >> > < >> >> >> > >> >> >> >> >> >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> >> >> > > >> >> >> > : >> >> >> > >> >> >> > Flink supports RocksDBReducingState and RocksDBAggregatingState >> state >> >> >> > variables that do a synchronous read-modify-write on every add >> call. >> >> >> While >> >> >> > this works great in many scenarios, for write-heavy workloads this >> >> can >> >> >> be >> >> >> > expensive and may become a bottleneck. >> >> >> > RocksDB's AssociativeMergeOperator is a storage-level primitive >> >> designed >> >> >> > for commutative and associative operations — integer counters, set >> >> >> union, >> >> >> > list append, approximate sketches, top-K structures, Bloom filter, >> >> and >> >> >> > similar patterns. However, frocksdb (the RocksDB fork used in >> Flink) >> >> >> does >> >> >> > not support Java based associative merge operators. >> >> >> > >> >> >> > This FLIP has two parts: >> >> >> > 1. Support for Java based AssociativeMergeOperator in frocksdb via >> >> JNI >> >> >> > 2. Support ReducingMergeState and AggregatingMergeState backed by >> >> Java >> >> >> > based associative merge operators >> >> >> > >> >> >> > The first part proposes exposing the associative merge operator >> as a >> >> >> Java >> >> >> > class in frocksdb with minimal JNI overhead. RocksDB can call >> these >> >> >> > operators during flushing and compaction. >> >> >> > The second part leverages the frocksdb support developed in the >> first >> >> >> part >> >> >> > to support ReducingMergeState and AggregatingMergeState state >> >> variables >> >> >> > with user defined ReduceFunction and AggregateFunction using >> rocksdb >> >> >> > backend. >> >> >> > >> >> >> > This enhancement opens up a powerful feature of rocksdb to Java. >> >> Flink >> >> >> > users can use it to build interesting associative data structures >> >> >> > on streaming data. I have added benchmark details from a prototype >> >> >> > implementation in the FLIP doc. >> >> >> > >> >> >> > Looking forward to feedback. >> >> >> > >> >> >> > FLIP in Google doc >> >> >> > < >> >> >> > >> >> >> >> >> >> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >> >> >> > > >> >> >> > >> >> >> > Best, >> >> >> > -Soumitra. >> >> >> > >> >> >> >> >> > >> >> >> > >> >
