Hi Soumitra,

I'm somehow getting confused. I was expecting there is no public API change
in Flink side, and users could use the original state descriptors to create
ReducingState/AggregatingState. And in the implementation side,
taking RocksDBReducingState as example, a wrapper of user defined
ReducingFunction that extends the AbstractAssociativeMergeOperator is
provided to the frocksdb side. This may be a simple way of optimization,
offering no additional new use cases. I may have missed some information,
so please correct me if I'm wrong.


And BTW, CC to @roman <[email protected]> and @Yuan Mei
<[email protected]> as you may be interested in this topic.


Best,
Zakelly

On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar <[email protected]>
wrote:

> Hi Zakelly,
>
> I have removed the ReducingMergeState.set method to minimize the changes to
> public APIs. The modified FLIP is at the same -
>
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>
> Community members,
> This proposal is a great addition to FrocksDB/Flink to enable applications
> to harness the power of RocksDB. This addition will help write-heavy
> workloads and let users build associative data structures in a streaming
> fashion.
>
> Please review, and I can answer any question.
>
> Best,
> -Soumitra.
>
> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <[email protected]>
> wrote:
>
> > Hi Zakelly,
> >
> > Thanks for your review and +1!
> >
> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]>
> wrote:
> >
> >> Thanks for the FLIP! I'd +1 for the direction of enhancing the reducing
> >> and
> >> aggregating state. It's important that we could leverage Rocksdb's merge
> >> operators to eliminate unnecessary `get()`. However I have a few
> >> questions:
> >>
> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in
> >> frocksdb side, so how could user pass this instance to the RocksDB State
> >> backend and what is the relationship with the Flink's `ReduceFunction`
> or
> >> `AggregateFunction`. I would suggest we may 'translate' user's
> >> `ReduceFunction`  into some frocksdb's merge operator, thus for flink we
> >> still maintain the original experience. WDYT?
> >>
> >
> > Yes, that is the whole idea. Since ReduceFunction and AggregateFunction
> > are existing primitives and they are associative and serializable, I am
> > building on them.
> >
> > This is how they are wired. There are two new classes
> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator
> implementing
> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is configured
> > with one of them. RocksDBReducingMergeOperator for ReducingMergeState,
> and
> > RocksDBAggregatingMergeOperator for AggregatingMergeState types. These
> > classes get the callback from frocksdb, handle the serde, and call the
> user
> > defined ReduceFunction and AggregateFunction.
> >
> >
> >>
> >> 2. I read you introduce a new `ReducingMergeState` with a new `set()`
> >> method compared with `ReducingState`, is this necessary? I mean if we
> >> intend to optimize performance solely through the use of a merge
> operator,
> >> this is not necessary, right? I do not recommend introducing too many
> >> public APIs, as this would force us to consider their
> >> semantics. Specifically for example, in changelog stream processing, if
> >> the
> >> `merge` operation were to permit state-setting operations, it would
> >> complicate potential future retraction (or reverse merge/aggregating)
> >> operations. WDYT?
> >>
> >
> > ReducingMergeState.set is just a shortcut for clear-add. I agree with
> your
> > point, I will remove it.
> >
> > Best,
> > -Soumitra.
> >
> >
> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <
> [email protected]>
> >> wrote:
> >>
> >> > Dear Community Members,
> >> >
> >> > I want to start discussion on the two tickets I filed recently:
> >> > Add support for Java based AssociativeMergeOperator via JNI
> >> > <https://issues.apache.org/jira/browse/FLINK-39455>
> >> > Support ReducingMergeState and AggregatingMergeState backed by Java
> >> based
> >> > associative merge operators
> >> > <https://issues.apache.org/jira/browse/FLINK-39456>
> >> >
> >> > Copying the motivation from the FLIP doc
> >> > <
> >> >
> >>
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> >> > >
> >> > :
> >> >
> >> > Flink supports RocksDBReducingState and RocksDBAggregatingState state
> >> > variables that do a synchronous read-modify-write on every add call.
> >> While
> >> > this works great in many scenarios, for write-heavy workloads this can
> >> be
> >> > expensive and may become a bottleneck.
> >> > RocksDB's AssociativeMergeOperator is a storage-level primitive
> designed
> >> > for commutative and associative operations — integer counters, set
> >> union,
> >> > list append, approximate sketches, top-K structures, Bloom filter, and
> >> > similar patterns. However, frocksdb (the RocksDB fork used in Flink)
> >> does
> >> > not support Java based associative merge operators.
> >> >
> >> > This FLIP has two parts:
> >> > 1. Support for Java based AssociativeMergeOperator in frocksdb via JNI
> >> > 2. Support ReducingMergeState and AggregatingMergeState backed by Java
> >> > based associative merge operators
> >> >
> >> > The first part proposes exposing the associative merge operator as a
> >> Java
> >> > class in frocksdb with minimal JNI overhead. RocksDB can call these
> >> > operators during flushing and compaction.
> >> > The second part leverages the frocksdb support developed in the first
> >> part
> >> > to support ReducingMergeState and AggregatingMergeState state
> variables
> >> > with user defined ReduceFunction and AggregateFunction using rocksdb
> >> > backend.
> >> >
> >> > This enhancement opens up a powerful feature of rocksdb to Java. Flink
> >> > users can use it to build interesting associative data structures
> >> > on streaming data. I have added benchmark details from a prototype
> >> > implementation in the FLIP doc.
> >> >
> >> > Looking forward to feedback.
> >> >
> >> > FLIP in Google doc
> >> > <
> >> >
> >>
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> >> > >
> >> >
> >> > Best,
> >> > -Soumitra.
> >> >
> >>
> >
>

Reply via email to