Hi Soumitra,

Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing and
aggregating state. It's important that we could leverage Rocksdb's merge
operators to eliminate unnecessary `get()`. However I have a few questions:

1. I see you will introduce `AbstractAssociativeMergeOperator` in
frocksdb side, so how could user pass this instance to the RocksDB State
backend and what is the relationship with the Flink's `ReduceFunction` or
`AggregateFunction`. I would suggest we may 'translate' user's
`ReduceFunction`  into some frocksdb's merge operator, thus for flink we
still maintain the original experience. WDYT?

2. I read you introduce a new `ReducingMergeState` with a new `set()`
method compared with `ReducingState`, is this necessary? I mean if we
intend to optimize performance solely through the use of a merge operator,
this is not necessary, right? I do not recommend introducing too many
public APIs, as this would force us to consider their
semantics. Specifically for example, in changelog stream processing, if the
`merge` operation were to permit state-setting operations, it would
complicate potential future retraction (or reverse merge/aggregating)
operations. WDYT?


Best,
Zakelly


On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <[email protected]>
wrote:

> Dear Community Members,
>
> I want to start discussion on the two tickets I filed recently:
> Add support for Java based AssociativeMergeOperator via JNI
> <https://issues.apache.org/jira/browse/FLINK-39455>
> Support ReducingMergeState and AggregatingMergeState backed by Java based
> associative merge operators
> <https://issues.apache.org/jira/browse/FLINK-39456>
>
> Copying the motivation from the FLIP doc
> <
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> >
> :
>
> Flink supports RocksDBReducingState and RocksDBAggregatingState state
> variables that do a synchronous read-modify-write on every add call. While
> this works great in many scenarios, for write-heavy workloads this can be
> expensive and may become a bottleneck.
> RocksDB's AssociativeMergeOperator is a storage-level primitive designed
> for commutative and associative operations — integer counters, set union,
> list append, approximate sketches, top-K structures, Bloom filter, and
> similar patterns. However, frocksdb (the RocksDB fork used in Flink) does
> not support Java based associative merge operators.
>
> This FLIP has two parts:
> 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI
> 2. Support ReducingMergeState and AggregatingMergeState backed by Java
> based associative merge operators
>
> The first part proposes exposing the associative merge operator as a Java
> class in frocksdb with minimal JNI overhead. RocksDB can call these
> operators during flushing and compaction.
> The second part leverages the frocksdb support developed in the first part
> to support ReducingMergeState and AggregatingMergeState state variables
> with user defined ReduceFunction and AggregateFunction using rocksdb
> backend.
>
> This enhancement opens up a powerful feature of rocksdb to Java. Flink
> users can use it to build interesting associative data structures
> on streaming data. I have added benchmark details from a prototype
> implementation in the FLIP doc.
>
> Looking forward to feedback.
>
> FLIP in Google doc
> <
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> >
>
> Best,
> -Soumitra.
>

Reply via email to