Hi Martjin,

I agree with you, Flink should do the right thing underneath the
declarative layer. And for the lower level APIs, Flink should continue to
improve on observability and recommendations to diagnose and resolve the
issues.

Best,
-Soumitra.

On Wed, Jun 3, 2026 at 11:03 PM Martijn Visser <[email protected]>
wrote:

> Hi Soumitra,
>
> Thanks for the FLIP. This is little outside of my scope of expertise,
> but I wanted to reply to one thing that you said, being
>
> >  Flink should not dictate a particular way of writing an application. It
> is up to the user to decide how to express their computation using
> different primitives. Different primitives provide different runtime
> characteristics, have different resource needs, and may need different
> tunings to achieve best performance.
>
> I don't think this is a true statement at all. Users struggle with
> writing good, performant streaming applications, and we need to make
> it as easy for them to achieve this. I would argue that in the
> declarative landscape of APIs, it's actually Flink that under the hood
> decides how computation is being achieved, and not the user. In my
> opinion, Flink should actually be (a bit) more opinionated towards
> users and guide them to the right approach, and offer primitives
> specifically for users who need lower level interfaces for specialized
> use cases.
>
> Best regards,
>
> Martijn
>
> Op do 4 jun 2026 om 07:30 schreef Soumitra Kumar <[email protected]
> >:
> >
> > Hi Roman,
> >
> > Thanks for your comments!
> >
> > I agree that there are several ways to implement the event reordering,
> and
> > in fact I have 3 implementations with different runtime characteristics.
> I
> > don't know how MapState can help with generic ordering, but my knowledge
> on
> > that is limited.
> >
> > Here is my line of thinking and justification of the FLIP:
> >
> >    1. Flink is a wonderful distributed processing engine and it already
> >    leverages RocksDB as one of the state backends.
> >    2. Flink should not dictate a particular way of writing an
> application.
> >    It is up to the user to decide how to express their computation using
> >    different primitives. Different primitives provide different runtime
> >    characteristics, have different resource needs, and may need different
> >    tunings to achieve best performance.
> >    3. RocksDB is a C/C++ engine and supports associative merge operators.
> >    In fact, ListState is built using one of the merge operators defined
> in C++
> >    in RocksDB.
> >    4. How cool would it be to enable support for Java-based associative
> >    merge operators in FRocksDB! This would enable Flink application
> developers
> >    to build aggregations using associative binary operations. A few
> >    interesting such aggregations are set union, set intersection, bloom
> >    filters, HLLs, Count-Min Sketch, etc.
> >
> > Needless to say, the devil is in the details to ensure this works at
> > reasonable performance. We can discuss and address those concerns at the
> PR
> > stage. I am motivated and excited about this enhancement. I have a few
> > things to add to the FLIP based on prior comments, and I am available to
> > answer any questions or concerns to make progress on the FLIP.
> >
> > Thanks again, best,
> > -Soumitra.
> >
> > On Wed, Jun 3, 2026 at 1:30 AM Roman Khachatryan <[email protected]>
> wrote:
> >
> > > Thanks for the clarifications,
> > >
> > > I briefly took a look at the use case
> > > > THIS <https://github.com/soumitrak/flink_streaming_event_reordering>
> > > repo.
> > > > Essentially, the scenario is to reorder events based on event time,
> and
> > > > process the ordered events every minute. In this case, I am building
> a
> > > > sorted list using an associative operator that implements merging two
> > > > sorted lists.
> > >
> > > Isn't it possible to use MapState keyed by event time?
> > > The sorting will come for free on RocksDB and PUT will only add the new
> > > element without touching the existing ones.
> > > (there is an API to check whether it's sorted or not)
> > >
> > > >> 6. Alternative: ListState
> > > > ListState uses the "stringappendtest" merge operator, defined in the
> C++
> > > > layer in FrocksDB, that concatenates strings using comma char. I can
> > > > implement the sorted list using this construct,
> > >
> > > I think we're talking about different things. My proposal was to use
> > > ListState without
> > > using the merge operator directly. Yes it will load all the records for
> > > aggregation - but
> > > that's the same as in the original proposal, isn't it?
> > > Anyways, if the requirement is to sort those records then using
> MapState
> > > keyed by
> > > the sort key looks like a better idea to me.
> > >
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Mon, Jun 1, 2026 at 6:44 AM Soumitra Kumar <
> [email protected]>
> > > wrote:
> > >
> > > > Hi Roman,
> > > >
> > > > Thanks for your thorough review, please find the comments below.
> > > >
> > > > On Sun, May 31, 2026 at 2:42 PM Roman Khachatryan <[email protected]>
> > > > wrote:
> > > >
> > > > > 1. Is it possible that reads will become more costly in some
> scenarios?
> > > > >
> > > > > Per my understanding, on compaction RocksDB would need to read and
> > > write
> > > > > back the same state (compared to the current implementation)
> > > > > So the benefit is deferring and "batching" those read/write
> operations.
> > > > >
> > > > > Current approach
> > > > > add: read + deserialize + aggregate + serialize + write (IO#1)
> > > > > get: read + deserialize (IO#2)
> > > > >
> > > > > Proposed approach
> > > > > add: serialize + write (IO#1)
> > > > > compact: read + deserialize + aggregate + serialize + write (IO#2)
> > > > > get: read + deserialize (IO#3)
> > > > >
> > > > > But if adds are mostly followed by gets then there's more IO
> because
> > > > > RocksDB needs to make an additional call via JNI before serving
> reads,
> > > > > right?
> > > > > If that's the case, we need to have two implementations and select
> one
> > > > > depending on the case?
> > > > >
> > > >
> > > > Your calculation of the cost of the proposed approach is correct. If
> > > there
> > > > is frequent read-after-write, then the merge operator does not add
> value.
> > > > Users should use the existing read-modify-write based state variables
> > > > instead. The proposal does not change the behavior of any of the
> existing
> > > > state variables. Users can decide when to use the new state variables
> > > that
> > > > support custom Reduce/Aggregate functions.
> > > >
> > > >
> > > > > 2. Speaking more generally, could you list the motivating use
> cases,
> > > e.g.
> > > > > existing Flink DataStream Operators, custom operators, SQL
> operators?
> > > > > If the impact depends on the operator, it would be helpful to
> > > understand
> > > > > how the existing operators would be affected.
> > > > >
> > > >
> > > > I got the idea of this enhancement based on a real use case at my
> work. I
> > > > have captured the scenarios in THIS
> > > > <https://github.com/soumitrak/flink_streaming_event_reordering>
> repo.
> > > > Essentially, the scenario is to reorder events based on event time,
> and
> > > > process the ordered events every minute. In this case, I am building
> a
> > > > sorted list using an associative operator that implements merging two
> > > > sorted lists.
> > > >
> > > > In general, the proposal should benefit write-heavy (less read)
> scenarios
> > > > where the computation can be a Reduce/Aggregate function. The
> existing
> > > > state variables do read-modify-write and become a bottleneck for
> > > ingestion
> > > > for write-heavy use cases. Check the performance using the prototype
> > > > implementation HERE
> > > > <
> > > >
> > >
> https://github.com/soumitrak/flink_streaming_event_reordering#performance-comparison
> > > > >
> > > > .
> > > >
> > > >
> > > > > 3. Could you clarify how the Reduce/Aggregate function calls will
> be
> > > > > dispatched?
> > > > > In theory, if an operator has two such states, their states will
> end up
> > > > in
> > > > > the same RocksDB instance and we need to distinguish them, right?
> > > >
> > > >
> > > > RocksDB allows one merge operator per ColumnFamily, and AFAIU, Flink
> > > > creates one ColumnFamily per state variable. Currently, Flink sets
> the
> > > > "stringappendtest"
> > > > <
> > > >
> > >
> https://github.com/apache/flink/blob/6c756a084d40f4716cdfdb4fdd71b0c919c448e6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java#L133
> > > > >
> > > > merge operator for all ColumnFamilies. In this proposal, Flink can
> set a
> > > > user provided Java based Reduce/Aggregate function as the merge
> operator.
> > > > In your example, if the operator has two state variables, then there
> will
> > > > be two ColumnFamilies, and Flink can set a custom ReduceFunction for
> one
> > > > variable and a custom AggregateFunction for the other variable. We
> don't
> > > > need to change anything at this layer in Flink/RocksDB.
> > > >
> > > >
> > > > > 4. Function dispatch during restore
> > > > > IIRC, aggregate functions might not be available during restore
> > > > operation -
> > > > > because state descriptors containing these functions are set later.
> > > > > Is this a concern? Do we need to disable compactions during
> restore?
> > > > >
> > > >
> > > > Great point! The Reduce/Aggregate functions in Flink are already
> > > > serializable and in the proposal they are serialized in the
> savepoint and
> > > > are restored when rocksdb is loaded. This allows rocksdb to call
> these
> > > > functions during compaction before state descriptors are called.
> This way
> > > > we don't need to disable compaction during restore.
> > > >
> > > >
> > > > > 5. Remote compactions
> > > > > In the light of Disaggregated State Backend, Flink might benefit
> from
> > > > > remote compactions.
> > > > > This may help with predictability (no compaction impact on
> processing)
> > > > and
> > > > > achieving better cost/utilization.
> > > > > AFAIK ForSt doesn't support it ATM, but it would be great to keep
> this
> > > > > possibility open.
> > > > > And I think the current proposal will make remote compactions
> > > > problematic.
> > > > >
> > > >
> > > > TBH, I don't understand ForSt in detail to comment on this item.
> Since
> > > the
> > > > proposal is exposing associative merge operators, it should not be an
> > > issue
> > > > to support in ForSt. In fact, if ForSt does not support associative
> merge
> > > > operators, then I will volunteer to add it, but let's get this
> proposal
> > > > first.
> > > >
> > > >
> > > > > 6. Alternative: ListState
> > > > > Have you considered using ListState to accumulate the records; and
> then
> > > > > reduce/aggregate on read (or incrementally on write)?
> > > > > Basically, doing the same thing as you proposed but purely in Java.
> > > > >
> > > > > I think it should address all the concerns I mentioned above; plus
> > > > > - removes the associative requirement (because we control the
> order)
> > > > > - removes RocksDB dependency
> > > > > - gives some flexibility when to aggregate (incrementally on write
> > > and/or
> > > > > on read)
> > > > >
> > > > > (I couldn't find this in the Rejected Alternatives, sorry if I
> missed
> > > > > that).
> > > > >
> > > >
> > > > ListState uses the "stringappendtest" merge operator, defined in the
> C++
> > > > layer in FrocksDB, that concatenates strings using comma char. I can
> > > > implement the sorted list using this construct, but the read will be
> more
> > > > expensive than the proposal, since the sorting will happen during the
> > > read.
> > > > The proposal adds support for pure Java based associative merge
> operators
> > > > that will be invoked by RocksDB using JNI. There will be JNI
> overhead,
> > > but
> > > > it is not on the write thread. The write thread is as performant as
> the
> > > > ListState implementation. I have a prototype implementation that
> reduces
> > > > the JNI overhead. I think this can be a great addition to Flink and,
> just
> > > > like other advanced features, it requires understanding/tuning of
> > > rocksdb.
> > > >
> > > > Best,
> > > > -Soumitra.
> > > >
> > > >
> > > > >
> > > > > On Tue, May 26, 2026 at 3:50 PM Zakelly Lan <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > > Hi Soumitra,
> > > > > >
> > > > > > I'm somehow getting confused. I was expecting there is no public
> API
> > > > > > change in Flink side, and users could use the original state
> > > > descriptors
> > > > > to
> > > > > > create ReducingState/AggregatingState. And in the implementation
> > > side,
> > > > > > taking RocksDBReducingState as example, a wrapper of user defined
> > > > > > ReducingFunction that extends the
> AbstractAssociativeMergeOperator is
> > > > > > provided to the frocksdb side. This may be a simple way of
> > > > optimization,
> > > > > > offering no additional new use cases. I may have missed some
> > > > information,
> > > > > > so please correct me if I'm wrong.
> > > > > >
> > > > > >
> > > > > > And BTW, CC to @roman <[email protected]> and @Yuan Mei
> > > > > > <[email protected]> as you may be interested in this topic.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Zakelly
> > > > > >
> > > > > > On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar <
> > > > > [email protected]>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Zakelly,
> > > > > >>
> > > > > >> I have removed the ReducingMergeState.set method to minimize the
> > > > changes
> > > > > >> to
> > > > > >> public APIs. The modified FLIP is at the same -
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> > > > > >>
> > > > > >> Community members,
> > > > > >> This proposal is a great addition to FrocksDB/Flink to enable
> > > > > applications
> > > > > >> to harness the power of RocksDB. This addition will help
> write-heavy
> > > > > >> workloads and let users build associative data structures in a
> > > > streaming
> > > > > >> fashion.
> > > > > >>
> > > > > >> Please review, and I can answer any question.
> > > > > >>
> > > > > >> Best,
> > > > > >> -Soumitra.
> > > > > >>
> > > > > >> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <
> > > > > [email protected]>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Zakelly,
> > > > > >> >
> > > > > >> > Thanks for your review and +1!
> > > > > >> >
> > > > > >> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <
> > > [email protected]>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Thanks for the FLIP! I'd +1 for the direction of enhancing
> the
> > > > > reducing
> > > > > >> >> and
> > > > > >> >> aggregating state. It's important that we could leverage
> > > Rocksdb's
> > > > > >> merge
> > > > > >> >> operators to eliminate unnecessary `get()`. However I have a
> few
> > > > > >> >> questions:
> > > > > >> >>
> > > > > >> >> 1. I see you will introduce
> `AbstractAssociativeMergeOperator` in
> > > > > >> >> frocksdb side, so how could user pass this instance to the
> > > RocksDB
> > > > > >> State
> > > > > >> >> backend and what is the relationship with the Flink's
> > > > > `ReduceFunction`
> > > > > >> or
> > > > > >> >> `AggregateFunction`. I would suggest we may 'translate'
> user's
> > > > > >> >> `ReduceFunction`  into some frocksdb's merge operator, thus
> for
> > > > flink
> > > > > >> we
> > > > > >> >> still maintain the original experience. WDYT?
> > > > > >> >>
> > > > > >> >
> > > > > >> > Yes, that is the whole idea. Since ReduceFunction and
> > > > > AggregateFunction
> > > > > >> > are existing primitives and they are associative and
> > > serializable, I
> > > > > am
> > > > > >> > building on them.
> > > > > >> >
> > > > > >> > This is how they are wired. There are two new classes
> > > > > >> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator
> > > > > >> implementing
> > > > > >> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is
> > > > > >> configured
> > > > > >> > with one of them. RocksDBReducingMergeOperator for
> > > > ReducingMergeState,
> > > > > >> and
> > > > > >> > RocksDBAggregatingMergeOperator for AggregatingMergeState
> types.
> > > > These
> > > > > >> > classes get the callback from frocksdb, handle the serde, and
> call
> > > > the
> > > > > >> user
> > > > > >> > defined ReduceFunction and AggregateFunction.
> > > > > >> >
> > > > > >> >
> > > > > >> >>
> > > > > >> >> 2. I read you introduce a new `ReducingMergeState` with a new
> > > > `set()`
> > > > > >> >> method compared with `ReducingState`, is this necessary? I
> mean
> > > if
> > > > we
> > > > > >> >> intend to optimize performance solely through the use of a
> merge
> > > > > >> operator,
> > > > > >> >> this is not necessary, right? I do not recommend introducing
> too
> > > > many
> > > > > >> >> public APIs, as this would force us to consider their
> > > > > >> >> semantics. Specifically for example, in changelog stream
> > > > processing,
> > > > > if
> > > > > >> >> the
> > > > > >> >> `merge` operation were to permit state-setting operations, it
> > > would
> > > > > >> >> complicate potential future retraction (or reverse
> > > > merge/aggregating)
> > > > > >> >> operations. WDYT?
> > > > > >> >>
> > > > > >> >
> > > > > >> > ReducingMergeState.set is just a shortcut for clear-add. I
> agree
> > > > with
> > > > > >> your
> > > > > >> > point, I will remove it.
> > > > > >> >
> > > > > >> > Best,
> > > > > >> > -Soumitra.
> > > > > >> >
> > > > > >> >
> > > > > >> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <
> > > > > >> [email protected]>
> > > > > >> >> wrote:
> > > > > >> >>
> > > > > >> >> > Dear Community Members,
> > > > > >> >> >
> > > > > >> >> > I want to start discussion on the two tickets I filed
> recently:
> > > > > >> >> > Add support for Java based AssociativeMergeOperator via JNI
> > > > > >> >> > <https://issues.apache.org/jira/browse/FLINK-39455>
> > > > > >> >> > Support ReducingMergeState and AggregatingMergeState
> backed by
> > > > Java
> > > > > >> >> based
> > > > > >> >> > associative merge operators
> > > > > >> >> > <https://issues.apache.org/jira/browse/FLINK-39456>
> > > > > >> >> >
> > > > > >> >> > Copying the motivation from the FLIP doc
> > > > > >> >> > <
> > > > > >> >> >
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> > > > > >> >> > >
> > > > > >> >> > :
> > > > > >> >> >
> > > > > >> >> > Flink supports RocksDBReducingState and
> RocksDBAggregatingState
> > > > > state
> > > > > >> >> > variables that do a synchronous read-modify-write on every
> add
> > > > > call.
> > > > > >> >> While
> > > > > >> >> > this works great in many scenarios, for write-heavy
> workloads
> > > > this
> > > > > >> can
> > > > > >> >> be
> > > > > >> >> > expensive and may become a bottleneck.
> > > > > >> >> > RocksDB's AssociativeMergeOperator is a storage-level
> primitive
> > > > > >> designed
> > > > > >> >> > for commutative and associative operations — integer
> counters,
> > > > set
> > > > > >> >> union,
> > > > > >> >> > list append, approximate sketches, top-K structures, Bloom
> > > > filter,
> > > > > >> and
> > > > > >> >> > similar patterns. However, frocksdb (the RocksDB fork used
> in
> > > > > Flink)
> > > > > >> >> does
> > > > > >> >> > not support Java based associative merge operators.
> > > > > >> >> >
> > > > > >> >> > This FLIP has two parts:
> > > > > >> >> > 1. Support for Java based AssociativeMergeOperator in
> frocksdb
> > > > via
> > > > > >> JNI
> > > > > >> >> > 2. Support ReducingMergeState and AggregatingMergeState
> backed
> > > by
> > > > > >> Java
> > > > > >> >> > based associative merge operators
> > > > > >> >> >
> > > > > >> >> > The first part proposes exposing the associative merge
> operator
> > > > as
> > > > > a
> > > > > >> >> Java
> > > > > >> >> > class in frocksdb with minimal JNI overhead. RocksDB can
> call
> > > > these
> > > > > >> >> > operators during flushing and compaction.
> > > > > >> >> > The second part leverages the frocksdb support developed
> in the
> > > > > first
> > > > > >> >> part
> > > > > >> >> > to support ReducingMergeState and AggregatingMergeState
> state
> > > > > >> variables
> > > > > >> >> > with user defined ReduceFunction and AggregateFunction
> using
> > > > > rocksdb
> > > > > >> >> > backend.
> > > > > >> >> >
> > > > > >> >> > This enhancement opens up a powerful feature of rocksdb to
> > > Java.
> > > > > >> Flink
> > > > > >> >> > users can use it to build interesting associative data
> > > structures
> > > > > >> >> > on streaming data. I have added benchmark details from a
> > > > prototype
> > > > > >> >> > implementation in the FLIP doc.
> > > > > >> >> >
> > > > > >> >> > Looking forward to feedback.
> > > > > >> >> >
> > > > > >> >> > FLIP in Google doc
> > > > > >> >> > <
> > > > > >> >> >
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >> > Best,
> > > > > >> >> > -Soumitra.
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to