Hey Soumitra, thanks for the proposal, the direction is a great exploration!

I have the following questions:

   1.

   *Motivation Use Case:* We need a more specific SQL/DataStream use case
   from a user's perspective to motivate this change. This would be a great
   amendment to the FLIP.
   2.

   This is essentially a DB operation and optimization. Can it be
   implemented without changing Flink's Public Interfaces?
   3.

   Most importantly, how will this land end-to-end? This involves rewriting
   SQL and existing DS (DataStream) operators. How will it be used from an
   end-to-end perspective?


Best
Yuan


On Mon, Jun 1, 2026 at 12:44 PM Soumitra Kumar <[email protected]>
wrote:

> Hi Roman,
>
> Thanks for your thorough review, please find the comments below.
>
> On Sun, May 31, 2026 at 2:42 PM Roman Khachatryan <[email protected]>
> wrote:
>
>> 1. Is it possible that reads will become more costly in some scenarios?
>>
>> Per my understanding, on compaction RocksDB would need to read and write
>> back the same state (compared to the current implementation)
>> So the benefit is deferring and "batching" those read/write operations.
>>
>> Current approach
>> add: read + deserialize + aggregate + serialize + write (IO#1)
>> get: read + deserialize (IO#2)
>>
>> Proposed approach
>> add: serialize + write (IO#1)
>> compact: read + deserialize + aggregate + serialize + write (IO#2)
>> get: read + deserialize (IO#3)
>>
>> But if adds are mostly followed by gets then there's more IO because
>> RocksDB needs to make an additional call via JNI before serving reads,
>> right?
>> If that's the case, we need to have two implementations and select one
>> depending on the case?
>>
>
> Your calculation of the cost of the proposed approach is correct. If there
> is frequent read-after-write, then the merge operator does not add value.
> Users should use the existing read-modify-write based state variables
> instead. The proposal does not change the behavior of any of the existing
> state variables. Users can decide when to use the new state variables that
> support custom Reduce/Aggregate functions.
>
>
>> 2. Speaking more generally, could you list the motivating use cases, e.g.
>> existing Flink DataStream Operators, custom operators, SQL operators?
>> If the impact depends on the operator, it would be helpful to understand
>> how the existing operators would be affected.
>>
>
> I got the idea of this enhancement based on a real use case at my work. I
> have captured the scenarios in THIS
> <https://github.com/soumitrak/flink_streaming_event_reordering> repo.
> Essentially, the scenario is to reorder events based on event time, and
> process the ordered events every minute. In this case, I am building a
> sorted list using an associative operator that implements merging two
> sorted lists.
>
> In general, the proposal should benefit write-heavy (less read) scenarios
> where the computation can be a Reduce/Aggregate function. The existing
> state variables do read-modify-write and become a bottleneck for ingestion
> for write-heavy use cases. Check the performance using the prototype
> implementation HERE
> <https://github.com/soumitrak/flink_streaming_event_reordering#performance-comparison>
> .
>
>
>> 3. Could you clarify how the Reduce/Aggregate function calls will be
>> dispatched?
>> In theory, if an operator has two such states, their states will end up in
>> the same RocksDB instance and we need to distinguish them, right?
>
>
> RocksDB allows one merge operator per ColumnFamily, and AFAIU, Flink
> creates one ColumnFamily per state variable. Currently, Flink sets the
> "stringappendtest"
> <https://github.com/apache/flink/blob/6c756a084d40f4716cdfdb4fdd71b0c919c448e6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java#L133>
> merge operator for all ColumnFamilies. In this proposal, Flink can set a
> user provided Java based Reduce/Aggregate function as the merge operator.
> In your example, if the operator has two state variables, then there will
> be two ColumnFamilies, and Flink can set a custom ReduceFunction for one
> variable and a custom AggregateFunction for the other variable. We don't
> need to change anything at this layer in Flink/RocksDB.
>
>
>> 4. Function dispatch during restore
>> IIRC, aggregate functions might not be available during restore operation
>> -
>> because state descriptors containing these functions are set later.
>> Is this a concern? Do we need to disable compactions during restore?
>>
>
> Great point! The Reduce/Aggregate functions in Flink are already
> serializable and in the proposal they are serialized in the savepoint and
> are restored when rocksdb is loaded. This allows rocksdb to call these
> functions during compaction before state descriptors are called. This way
> we don't need to disable compaction during restore.
>
>
>> 5. Remote compactions
>> In the light of Disaggregated State Backend, Flink might benefit from
>> remote compactions.
>> This may help with predictability (no compaction impact on processing) and
>> achieving better cost/utilization.
>> AFAIK ForSt doesn't support it ATM, but it would be great to keep this
>> possibility open.
>> And I think the current proposal will make remote compactions problematic.
>>
>
> TBH, I don't understand ForSt in detail to comment on this item. Since the
> proposal is exposing associative merge operators, it should not be an issue
> to support in ForSt. In fact, if ForSt does not support associative merge
> operators, then I will volunteer to add it, but let's get this proposal
> first.
>
>
>> 6. Alternative: ListState
>> Have you considered using ListState to accumulate the records; and then
>> reduce/aggregate on read (or incrementally on write)?
>> Basically, doing the same thing as you proposed but purely in Java.
>>
>> I think it should address all the concerns I mentioned above; plus
>> - removes the associative requirement (because we control the order)
>> - removes RocksDB dependency
>> - gives some flexibility when to aggregate (incrementally on write and/or
>> on read)
>>
>> (I couldn't find this in the Rejected Alternatives, sorry if I missed
>> that).
>>
>
> ListState uses the "stringappendtest" merge operator, defined in the C++
> layer in FrocksDB, that concatenates strings using comma char. I can
> implement the sorted list using this construct, but the read will be more
> expensive than the proposal, since the sorting will happen during the read.
> The proposal adds support for pure Java based associative merge operators
> that will be invoked by RocksDB using JNI. There will be JNI overhead, but
> it is not on the write thread. The write thread is as performant as the
> ListState implementation. I have a prototype implementation that reduces
> the JNI overhead. I think this can be a great addition to Flink and, just
> like other advanced features, it requires understanding/tuning of rocksdb.
>
> Best,
> -Soumitra.
>
>
>>
>> On Tue, May 26, 2026 at 3:50 PM Zakelly Lan <[email protected]>
>> wrote:
>>
>> > Hi Soumitra,
>> >
>> > I'm somehow getting confused. I was expecting there is no public API
>> > change in Flink side, and users could use the original state
>> descriptors to
>> > create ReducingState/AggregatingState. And in the implementation side,
>> > taking RocksDBReducingState as example, a wrapper of user defined
>> > ReducingFunction that extends the AbstractAssociativeMergeOperator is
>> > provided to the frocksdb side. This may be a simple way of optimization,
>> > offering no additional new use cases. I may have missed some
>> information,
>> > so please correct me if I'm wrong.
>> >
>> >
>> > And BTW, CC to @roman <[email protected]> and @Yuan Mei
>> > <[email protected]> as you may be interested in this topic.
>> >
>> >
>> > Best,
>> > Zakelly
>> >
>> > On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar <
>> [email protected]>
>> > wrote:
>> >
>> >> Hi Zakelly,
>> >>
>> >> I have removed the ReducingMergeState.set method to minimize the
>> changes
>> >> to
>> >> public APIs. The modified FLIP is at the same -
>> >>
>> >>
>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>> >>
>> >> Community members,
>> >> This proposal is a great addition to FrocksDB/Flink to enable
>> applications
>> >> to harness the power of RocksDB. This addition will help write-heavy
>> >> workloads and let users build associative data structures in a
>> streaming
>> >> fashion.
>> >>
>> >> Please review, and I can answer any question.
>> >>
>> >> Best,
>> >> -Soumitra.
>> >>
>> >> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <
>> [email protected]>
>> >> wrote:
>> >>
>> >> > Hi Zakelly,
>> >> >
>> >> > Thanks for your review and +1!
>> >> >
>> >> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]>
>> >> wrote:
>> >> >
>> >> >> Thanks for the FLIP! I'd +1 for the direction of enhancing the
>> reducing
>> >> >> and
>> >> >> aggregating state. It's important that we could leverage Rocksdb's
>> >> merge
>> >> >> operators to eliminate unnecessary `get()`. However I have a few
>> >> >> questions:
>> >> >>
>> >> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in
>> >> >> frocksdb side, so how could user pass this instance to the RocksDB
>> >> State
>> >> >> backend and what is the relationship with the Flink's
>> `ReduceFunction`
>> >> or
>> >> >> `AggregateFunction`. I would suggest we may 'translate' user's
>> >> >> `ReduceFunction`  into some frocksdb's merge operator, thus for
>> flink
>> >> we
>> >> >> still maintain the original experience. WDYT?
>> >> >>
>> >> >
>> >> > Yes, that is the whole idea. Since ReduceFunction and
>> AggregateFunction
>> >> > are existing primitives and they are associative and serializable, I
>> am
>> >> > building on them.
>> >> >
>> >> > This is how they are wired. There are two new classes
>> >> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator
>> >> implementing
>> >> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is
>> >> configured
>> >> > with one of them. RocksDBReducingMergeOperator for
>> ReducingMergeState,
>> >> and
>> >> > RocksDBAggregatingMergeOperator for AggregatingMergeState types.
>> These
>> >> > classes get the callback from frocksdb, handle the serde, and call
>> the
>> >> user
>> >> > defined ReduceFunction and AggregateFunction.
>> >> >
>> >> >
>> >> >>
>> >> >> 2. I read you introduce a new `ReducingMergeState` with a new
>> `set()`
>> >> >> method compared with `ReducingState`, is this necessary? I mean if
>> we
>> >> >> intend to optimize performance solely through the use of a merge
>> >> operator,
>> >> >> this is not necessary, right? I do not recommend introducing too
>> many
>> >> >> public APIs, as this would force us to consider their
>> >> >> semantics. Specifically for example, in changelog stream
>> processing, if
>> >> >> the
>> >> >> `merge` operation were to permit state-setting operations, it would
>> >> >> complicate potential future retraction (or reverse
>> merge/aggregating)
>> >> >> operations. WDYT?
>> >> >>
>> >> >
>> >> > ReducingMergeState.set is just a shortcut for clear-add. I agree with
>> >> your
>> >> > point, I will remove it.
>> >> >
>> >> > Best,
>> >> > -Soumitra.
>> >> >
>> >> >
>> >> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <
>> >> [email protected]>
>> >> >> wrote:
>> >> >>
>> >> >> > Dear Community Members,
>> >> >> >
>> >> >> > I want to start discussion on the two tickets I filed recently:
>> >> >> > Add support for Java based AssociativeMergeOperator via JNI
>> >> >> > <https://issues.apache.org/jira/browse/FLINK-39455>
>> >> >> > Support ReducingMergeState and AggregatingMergeState backed by
>> Java
>> >> >> based
>> >> >> > associative merge operators
>> >> >> > <https://issues.apache.org/jira/browse/FLINK-39456>
>> >> >> >
>> >> >> > Copying the motivation from the FLIP doc
>> >> >> > <
>> >> >> >
>> >> >>
>> >>
>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>> >> >> > >
>> >> >> > :
>> >> >> >
>> >> >> > Flink supports RocksDBReducingState and RocksDBAggregatingState
>> state
>> >> >> > variables that do a synchronous read-modify-write on every add
>> call.
>> >> >> While
>> >> >> > this works great in many scenarios, for write-heavy workloads this
>> >> can
>> >> >> be
>> >> >> > expensive and may become a bottleneck.
>> >> >> > RocksDB's AssociativeMergeOperator is a storage-level primitive
>> >> designed
>> >> >> > for commutative and associative operations — integer counters, set
>> >> >> union,
>> >> >> > list append, approximate sketches, top-K structures, Bloom filter,
>> >> and
>> >> >> > similar patterns. However, frocksdb (the RocksDB fork used in
>> Flink)
>> >> >> does
>> >> >> > not support Java based associative merge operators.
>> >> >> >
>> >> >> > This FLIP has two parts:
>> >> >> > 1. Support for Java based AssociativeMergeOperator in frocksdb via
>> >> JNI
>> >> >> > 2. Support ReducingMergeState and AggregatingMergeState backed by
>> >> Java
>> >> >> > based associative merge operators
>> >> >> >
>> >> >> > The first part proposes exposing the associative merge operator
>> as a
>> >> >> Java
>> >> >> > class in frocksdb with minimal JNI overhead. RocksDB can call
>> these
>> >> >> > operators during flushing and compaction.
>> >> >> > The second part leverages the frocksdb support developed in the
>> first
>> >> >> part
>> >> >> > to support ReducingMergeState and AggregatingMergeState state
>> >> variables
>> >> >> > with user defined ReduceFunction and AggregateFunction using
>> rocksdb
>> >> >> > backend.
>> >> >> >
>> >> >> > This enhancement opens up a powerful feature of rocksdb to Java.
>> >> Flink
>> >> >> > users can use it to build interesting associative data structures
>> >> >> > on streaming data. I have added benchmark details from a prototype
>> >> >> > implementation in the FLIP doc.
>> >> >> >
>> >> >> > Looking forward to feedback.
>> >> >> >
>> >> >> > FLIP in Google doc
>> >> >> > <
>> >> >> >
>> >> >>
>> >>
>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>> >> >> > >
>> >> >> >
>> >> >> > Best,
>> >> >> > -Soumitra.
>> >> >> >
>> >> >>
>> >> >
>> >>
>> >
>>
>

Reply via email to