Thanks everyone for the feedback!

We've discussed the configuration offline with Rui and Zakelly and agreed
to keep the options and mark them @Experimental; and to choose the defaults
based on state backend type.
For that, KeyedStateStore interface needs to be extended.
I've updated the FLIP to reflect this.

I'm going to start a voting thread soon unless there are other suggestions
or objections.

Regards,
Roman

On Fri, Sep 12, 2025, 14:21 Lincoln Lee <[email protected]> wrote:

> +1 to the optimization plan and mark the new options as experimental.
>
> Optimizing state access after data expansion will help ease performance
> bottlenecks when this operator can’t be fully avoided.
>
> Thanks for pushing this forward!
>
> Best,
> Lincoln Lee
>
>
> Roman Khachatryan <[email protected]> 于2025年9月12日周五 16:38写道:
>
> > Hey Rui,
> >
> > Thanks for clarifying
> >
> > In my opinion, the flexibility is necessary here for a number of reasons
> > (e.g. payload size may vary per job, TM disk cache size is different,
> even
> > user priorities might be different).
> >
> > We can mark these options as @Experimental for now to be able to change
> > them in the future. And/or, the improved switching algorithm can be just
> a
> > new strategy, without the need to touch these options.
> >
> > > I don't have a strong opinion on this, if it's really necessary to
> retain
> > flexibility.
> >
> > In that case, I'd like to start the vote today or on Monday unless
> there's
> > any further feedback.
> >
> > Regards,
> > Roman
> >
> > On Fri, Sep 12, 2025, 10:07 Rui Fan <[email protected]> wrote:
> >
> > > Hey Roman,
> > >
> > > Thanks for your explanation for the first 2 questions, it is clear for
> > me!
> > >
> > > About the 3rd question:
> > >
> > > > I totally agree that configuration options add complexity and
> increase
> > > > maintenance burden, but in my opinion this is a trade-off between
> > > > flexibility and complexity.
> > >
> > > Totally agree that it is a trade-off.
> > >
> > > > In this case, I don't see how the minimum flexibility can be achieved
> > > > without adding those options. On the other hand, providing sensible
> > > > defaults makes it unnecessary to adjust these settings for most of
> the
> > > > users.
> > >
> > > IIUC, it is hard to provide a unified sensible defaults since rocksdb
> and
> > > hashmap state backend expect different thresholds.
> > >
> > > > Could you elaborate what do you mean by list size and how is it
> > different
> > > > from threshold.high and threshold.low proposed in the FLIP?
> > >
> > > What I mean is that we do not expose the threshold.high and
> threshold.low
> > > configuration options, but hard-code them inside the flink code to
> select
> > > different thresholds depending on whether rocksdb or hashmap is used.
> > >
> > > Following is the pseudocode:
> > >
> > > ```
> > > private static final int ROCKSDB_HIGH_THRESHOLD = 50;
> > > private static final int ROCKSDB_LOW_THRESHOLD = 40;
> > > private static final int HASHMAP_HIGH_THRESHOLD = 400;
> > > private static final int HASHMAP_LOW_THRESHOLD = 300;
> > >
> > > int highThreshold = isRocksdb ?
> > > ROCKSDB_HIGH_THRESHOLD:HASHMAP_HIGH_THRESHOLD;
> > > int lowThreshold = isRocksdb ?
> > ROCKSDB_LOW_THRESHOLD:HASHMAP_LOW_THRESHOLD;
> > > ```
> > >
> > > In the short term, users lose flexibility, but the code and usability
> are
> > > simpler.
> > > Most users do not need to adjust thresholds for HashMap and RocksDB.
> > >
> > > In the long term, if Flink wants to introduce other similar switching
> > > strategies
> > > other than thresholds, it can switch directly internally without
> > > considering the
> > > compatibility of configuration options.
> > >
> > > I don't have a strong opinion on this, if it's really necessary to
> retain
> > > flexibility.
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, Sep 11, 2025 at 11:02 PM Roman Khachatryan <[email protected]>
> > > wrote:
> > >
> > > > Hey Rui,
> > > >
> > > > Thanks for your feedback and questions!
> > > >
> > > > > 1. State Compatibility:
> > > >
> > > > > I would like to confirm if the underlying stored
> > > > > data structure is the same in the following two scenarios:
> > > >
> > > > > - A ValueState stored in LEGACY mode.
> > > > > - A ValueState stored in VALUE mode (as used by the ADAPTIVE
> strategy
> > > > >   when below the threshold).
> > > >
> > > > > The FLIP mentions that VALUE - similar to LEGACY, but I do not
> fully
> > > know
> > > > > what the difference is. If their physical storage format is
> > identical,
> > > it
> > > > > seems
> > > > > there might be an opportunity to provide state compatibility.
> > > >
> > > > No, in these two cases, the state is not compatible. This is because
> > > > in VALUE mode, a timestamp is stored along with every record, plus
> > > > metadata stores equalizer and hash function generators.
> > > >
> > > > The FLIP targets only new jobs. In the future, it is possible to add
> > > > a migration path. The motivation is simplicity and reduction of risks
> > > > (during state migration).
> > > >
> > > > > 2. Forward Compatibility for TTL
> > > >
> > > > > The FLIP mentions that TTL is a follow-up feature, that "storing
> the
> > > > > timestamps along with every record will be implemented from the get
> > > go".
> > > > > This implies a future state schema evolution. Could you elaborate
> on
> > > how
> > > > > the V1 serializer will be designed to handle this, ensuring that
> > > adopting
> > > > > V1
> > > > > of this feature won't require another state migration when TTL
> > support
> > > is
> > > > > added in V2?
> > > >
> > > > There's no migration necessary to add TTL support for V2: the
> > timestamps
> > > > will already be there, and there's no need for extra data.
> > > > For V1 (the current implementation), a migration path would be
> needed.
> > > >
> > > > > 3. Adaptive Threshold Configuration
> > > >
> > > > > FLIP introduces adaptive.threshold.high and adaptive.threshold.low.
> > > > > This adds a configuration burden on users. To set these values
> > > correctly,
> > > > > users need to understand the underlying performance
> characteristics.
> > > > > For example, the FLIP recommends very different thresholds for
> > RocksDB
> > > > > and Heap backends, which highlights this complexity.
> > > >
> > > > > Furthermore, from historical experience, it is easy to expose a new
> > > > > configuration,
> > > > > but much harder to change or remove it later due to user
> > compatibility
> > > > > concerns.
> > > > > If we don't expose these parameters now, it would allow the
> internal
> > > > > adaptive
> > > > > strategy to be improved in the future without any compatibility
> > issues.
> > > >
> > > > I totally agree that configuration options add complexity and
> increase
> > > > maintenance burden, but in my opinion this is a trade-off between
> > > > flexibility and complexity.
> > > > In this case, I don't see how the minimum flexibility can be achieved
> > > > without adding those options. On the other hand, providing sensible
> > > > defaults makes it unnecessary to adjust these settings for most of
> the
> > > > users.
> > > >
> > > > > In the first version, I think list size might be enough as an
> > internal
> > > > > strategy,
> > > > > and we can dynamically choose different thresholds for heap and
> > > rocksdb.
> > > >
> > > > Could you elaborate what do you mean by list size and how is it
> > different
> > > > from threshold.high and threshold.low proposed in the FLIP?
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Wed, Sep 3, 2025 at 12:32 PM Rui Fan <[email protected]>
> wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > Thanks for driving this great proposal and valuable discussion!
> > > > > I have a few questions related to compatibility and usability.
> > > > >
> > > > > 1. State Compatibility:
> > > > >
> > > > > The compatibility of state between the old and new modes, which is
> > > > crucial
> > > > > for migrating existing jobs. I would like to confirm if the
> > underlying
> > > > > stored
> > > > > data structure is the same in the following two scenarios:
> > > > >
> > > > > - A ValueState stored in LEGACY mode.
> > > > > - A ValueState stored in VALUE mode (as used by the ADAPTIVE
> strategy
> > > > >   when below the threshold).
> > > > >
> > > > > The FLIP mentions that VALUE - similar to LEGACY, but I do not
> fully
> > > know
> > > > > what the difference is. If their physical storage format is
> > identical,
> > > it
> > > > > seems
> > > > > there might be an opportunity to provide state compatibility.
> > > > >
> > > > > 2. Forward Compatibility for TTL
> > > > >
> > > > > The FLIP mentions that TTL is a follow-up feature, that "storing
> the
> > > > > timestamps along with every record will be implemented from the get
> > > go".
> > > > > This implies a future state schema evolution. Could you elaborate
> on
> > > how
> > > > > the V1 serializer will be designed to handle this, ensuring that
> > > adopting
> > > > > V1
> > > > > of this feature won't require another state migration when TTL
> > support
> > > is
> > > > > added in V2?
> > > > >
> > > > > 3. Adaptive Threshold Configuration
> > > > >
> > > > > FLIP introduces adaptive.threshold.high and adaptive.threshold.low.
> > > > > This adds a configuration burden on users. To set these values
> > > correctly,
> > > > > users need to understand the underlying performance
> characteristics.
> > > > > For example, the FLIP recommends very different thresholds for
> > RocksDB
> > > > > and Heap backends, which highlights this complexity.
> > > > >
> > > > > Furthermore, from historical experience, it is easy to expose a new
> > > > > configuration,
> > > > > but much harder to change or remove it later due to user
> > compatibility
> > > > > concerns.
> > > > > If we don't expose these parameters now, it would allow the
> internal
> > > > > adaptive
> > > > > strategy to be improved in the future without any compatibility
> > issues.
> > > > >
> > > > > For example, future policies could be based not only on list length
> > but
> > > > > also on the
> > > > > size of RowData in their cost models. Furthermore, operators could
> > even
> > > > > monitor
> > > > > the time it takes to access state when processing a specific key,
> > > > > triggering a mode
> > > > > switch only when latency exceeds a dynamic baseline. This not only
> > > makes
> > > > > operators truly "adaptive" but also greatly simplifies user
> > > > configuration.
> > > > >
> > > > > In the first version, I think list size might be enough as an
> > internal
> > > > > strategy,
> > > > > and we can dynamically choose different thresholds for heap and
> > > rocksdb.
> > > > >
> > > > > Looking forward to your feedback!
> > > > >
> > > > > Best,
> > > > > Rui
> > > > >
> > > > > On Mon, Sep 1, 2025 at 3:13 PM Piotr Nowojski <
> [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Thanks Roman for driving this. I highly support this effort.
> > > > > >
> > > > > > Big +1 from my side.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > > niedz., 31 sie 2025 o 21:58 Roman Khachatryan <[email protected]>
> > > > > > napisał(a):
> > > > > >
> > > > > > > Hi Zakelly,
> > > > > > >
> > > > > > > > So I assume there is a conditional branch to determine
> whether
> > > > > current
> > > > > > > > stream key is in VALUE or MAP mode, and this also involves
> some
> > > > state
> > > > > > > > access right?
> > > > > > >
> > > > > > > Yes, that's a good question; it might add one more state
> access.
> > > > > > > To mitigate that to some extent, the results of the 1st access
> to
> > > > > > > ValueState
> > > > > > > can be cached and be used later for add/retract (in case it's
> not
> > > > > empty).
> > > > > > > (such caching is already included in the benchmark results)
> > > > > > >
> > > > > > > > Have you evaluated the implementation relying on the
> > orderliness
> > > > > > > > of RocksDB's kv? RocksDB's `MapState.iterator().next()`
> > retrieves
> > > > the
> > > > > > > first
> > > > > > > > entry of the map in binary order. We could reverse the order
> of
> > > seq
> > > > > by
> > > > > > > > generating it from Long.MAX to 0, and thus the first entry
> > > > retrieved
> > > > > > > would
> > > > > > > > be the last one added. I see you mention this in 'Rejected
> > > > > > Alternatives'
> > > > > > > > but I'm still curious whether this could achieve an
> > improvement.
> > > To
> > > > > > > > my knowledge, the iterator might be less efficient since it
> > could
> > > > not
> > > > > > > > leverage bloom filters as point-lookup does and the cache is
> > > > > > unnecessary
> > > > > > > if
> > > > > > > > only the first entry is needed (of course we could remove
> that
> > > > > cache).
> > > > > > > It's
> > > > > > > > not immediately clear which approach is better, as each has
> its
> > > > > > > trade-offs.
> > > > > > >
> > > > > > > We tried a similar approach of using an iterator. The benefits
> > are
> > > > > > > diminished there
> > > > > > > by slow iteration (plus, iterator creation is also slow).
> Because
> > > of
> > > > > > that,
> > > > > > > the
> > > > > > > performance was similar to the current implementation. We
> didn't
> > > > > compare
> > > > > > > the
> > > > > > > two approaches side by side though.
> > > > > > >
> > > > > > > > I’d also suggest testing scenarios with retraction rates
> below
> > > > 100%,
> > > > > as
> > > > > > > > that may better reflect real-world workloads IIUC.
> > > > > > >
> > > > > > > I mostly focused on 100% because that's where I saw regression:
> > > > > > > the fewer retractions, the longer the list, the worse
> performance
> > > of
> > > > > > > ValueState.
> > > > > > > I'll re-run the benchmark with lower rates.
> > > > > > >
> > > > > > > > I think it's a good chance to push forward the
> > discussion/design
> > > of
> > > > > > > binary
> > > > > > > > sorted map state (FLIP-220)[1] since this seems to be a good
> > > > > > application
> > > > > > > > scenario. But I also think it's acceptable if we do some hack
> > to
> > > > only
> > > > > > > rely
> > > > > > > > on RocksDB's state implicitly rather that waiting the new
> > > > > > > > state primitives, if it is truly beneficial.
> > > > > > >
> > > > > > > I agree, probably not for this FLIP (because of the above
> > reasons),
> > > > but
> > > > > > for
> > > > > > > many other cases it would be definitely beneficial to expose
> the
> > > > > > sortedness
> > > > > > > of RocksDB in some way. Multi-way join (FLIP-516) [1] is one
> such
> > > > > > example.
> > > > > > >
> > > > > > > > By saying event-time based TTL, I meant to make it easier for
> > > users
> > > > > to
> > > > > > > > understand. The event-time could be defined and
> > > controlled/advanced
> > > > > by
> > > > > > > user
> > > > > > > > (SQL implementor or Flink user). e.g. Event-time could be
> > > watermark
> > > > > > > > progression or just record sequence (state will live through
> > > > specific
> > > > > > > > number of records), or even be advanced by special control
> > > records
> > > > > for
> > > > > > > > Datastream users. This kind of user-controlled time
> advancement
> > > is
> > > > > > what I
> > > > > > > > said "manually controllable". Such flexibility could be
> broadly
> > > > > > > beneficial.
> > > > > > >
> > > > > > > Thanks for clarifying, special control records is an
> interesting
> > > > idea,
> > > > > > and
> > > > > > > I think it should be easy to implement.
> > > > > > >
> > > > > > > > We’ve encountered cases where state expiration is
> inconsistent
> > > > > between
> > > > > > > > upstream and downstream SQL operators. With event-time based
> > TTL,
> > > > > > > operators
> > > > > > > > could share a synchronized notion of time, allowing them to
> > > expire
> > > > > > state
> > > > > > > in
> > > > > > > > a more coordinated way.
> > > > > > >
> > > > > > > Yeah, that's a pity that Flink doesn't have event-time TTL. But
> > to
> > > > > solve
> > > > > > > cross-operator TTL inconsistency, we'd need to change multiple
> > > > > operators
> > > > > > > (or
> > > > > > > state backend). I'm not sure we can efficiently support event
> > time
> > > > TTL
> > > > > > for
> > > > > > > a
> > > > > > > general case.
> > > > > > >
> > > > > > > P.S.: have a good vacation! :)
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator
> > > > > > >
> > > > > > > Regards,
> > > > > > > Roman
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan <
> > [email protected]
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Roman,
> > > > > > > >
> > > > > > > > Thanks for the answers! Please see my comments below:
> > > > > > > >
> > > > > > > > The switch is performed under each stream key individually,
> > when
> > > > > > > > > its specific list size
> > > > > > > > > reaches a threshold.
> > > > > > > >
> > > > > > > >
> > > > > > > > So I assume there is a conditional branch to determine
> whether
> > > > > current
> > > > > > > > stream key is in VALUE or MAP mode, and this also involves
> some
> > > > state
> > > > > > > > access right?
> > > > > > > >
> > > > > > > > Yes, those pointers are necessary, I couldn't find a way to
> get
> > > rid
> > > > > of
> > > > > > > > them:
> > > > > > > > > - prevSeqNo is used to emit penultimate element when
> > retracing
> > > > the
> > > > > > last
> > > > > > > > > one;
> > > > > > > > > - nextSeqNo is used to keep prevSeqNo correct when
> retracting
> > > an
> > > > > item
> > > > > > > > from
> > > > > > > > > the middle
> > > > > > > >
> > > > > > > >
> > > > > > > > Oh I see. Have you evaluated the implementation relying on
> the
> > > > > > > orderliness
> > > > > > > > of RocksDB's kv? RocksDB's `MapState.iterator().next()`
> > retrieves
> > > > the
> > > > > > > first
> > > > > > > > entry of the map in binary order. We could reverse the order
> of
> > > seq
> > > > > by
> > > > > > > > generating it from Long.MAX to 0, and thus the first entry
> > > > retrieved
> > > > > > > would
> > > > > > > > be the last one added. I see you mention this in 'Rejected
> > > > > > Alternatives'
> > > > > > > > but I'm still curious whether this could achieve an
> > improvement.
> > > To
> > > > > > > > my knowledge, the iterator might be less efficient since it
> > could
> > > > not
> > > > > > > > leverage bloom filters as point-lookup does and the cache is
> > > > > > unnecessary
> > > > > > > if
> > > > > > > > only the first entry is needed (of course we could remove
> that
> > > > > cache).
> > > > > > > It's
> > > > > > > > not immediately clear which approach is better, as each has
> its
> > > > > > > trade-offs.
> > > > > > > > I’d also suggest testing scenarios with retraction rates
> below
> > > > 100%,
> > > > > as
> > > > > > > > that may better reflect real-world workloads IIUC.
> > > > > > > >
> > > > > > > > I think it's a good chance to push forward the
> > discussion/design
> > > of
> > > > > > > binary
> > > > > > > > sorted map state (FLIP-220)[1] since this seems to be a good
> > > > > > application
> > > > > > > > scenario. But I also think it's acceptable if we do some hack
> > to
> > > > only
> > > > > > > rely
> > > > > > > > on RocksDB's state implicitly rather that waiting the new
> > > > > > > > state primitives, if it is truly beneficial.
> > > > > > > >
> > > > > > > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> > > > > > > >
> > > > > > > > I agree, I think that event-time based TTL is more useful in
> > > > general
> > > > > > > > > (I specified processing time as a default to make it less
> > > > > surprising
> > > > > > > for
> > > > > > > > > the users).
> > > > > > > >
> > > > > > > > I don't immediately see the potential usages of a manually
> > > > > controllable
> > > > > > > > > TtlTimeProvider - do you have any use cases in mind?
> > > > > > > >
> > > > > > > >
> > > > > > > > By saying event-time based TTL, I meant to make it easier for
> > > users
> > > > > to
> > > > > > > > understand. The event-time could be defined and
> > > controlled/advanced
> > > > > by
> > > > > > > user
> > > > > > > > (SQL implementor or Flink user). e.g. Event-time could be
> > > watermark
> > > > > > > > progression or just record sequence (state will live through
> > > > specific
> > > > > > > > number of records), or even be advanced by special control
> > > records
> > > > > for
> > > > > > > > Datastream users. This kind of user-controlled time
> advancement
> > > is
> > > > > > what I
> > > > > > > > said "manually controllable". Such flexibility could be
> broadly
> > > > > > > beneficial.
> > > > > > > >
> > > > > > > > We’ve encountered cases where state expiration is
> inconsistent
> > > > > between
> > > > > > > > upstream and downstream SQL operators. With event-time based
> > TTL,
> > > > > > > operators
> > > > > > > > could share a synchronized notion of time, allowing them to
> > > expire
> > > > > > state
> > > > > > > in
> > > > > > > > a more coordinated way.
> > > > > > > >
> > > > > > > >
> > > > > > > > Looking forward to your reply!  P.S. I'm on vacation for the
> > next
> > > > few
> > > > > > > days,
> > > > > > > > so I'll follow up later :) .
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Zakelly
> > > > > > > >
> > > > > > > > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan <
> > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Zakelly,
> > > > > > > > >
> > > > > > > > > Thanks for the feedback!
> > > > > > > > >
> > > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is
> the
> > > > > switch
> > > > > > > > > between
> > > > > > > > > > VALUE and MAP performed under each stream key considering
> > > each
> > > > > list
> > > > > > > > size,
> > > > > > > > > > or is it performed for all keys if the average list size
> > > > reaches
> > > > > > the
> > > > > > > > > given
> > > > > > > > > > thresholds?
> > > > > > > > >
> > > > > > > > > The switch is performed under each stream key individually,
> > > when
> > > > > > > > > its specific list size
> > > > > > > > > reaches a threshold.
> > > > > > > > >
> > > > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and
> > > > > 'nextSeqNo'
> > > > > > > to
> > > > > > > > > link
> > > > > > > > > > all the nodes? I assume there should be a traversal need
> > but
> > > I
> > > > > > don't
> > > > > > > > see
> > > > > > > > > > that in pseudo-code.
> > > > > > > > >
> > > > > > > > > Yes, those pointers are necessary, I couldn't find a way to
> > get
> > > > rid
> > > > > > of
> > > > > > > > > them:
> > > > > > > > > - prevSeqNo is used to emit penultimate element when
> > retracing
> > > > the
> > > > > > last
> > > > > > > > > one;
> > > > > > > > > - nextSeqNo is used to keep prevSeqNo correct when
> retracting
> > > an
> > > > > item
> > > > > > > > from
> > > > > > > > > the middle
> > > > > > > > >
> > > > > > > > > > And is `MapState.iterator` also feasible?
> > > > > > > > > Yes, in fact, the ADAPTIVE strategy uses an iterator to
> move
> > > the
> > > > > > > entries
> > > > > > > > > between MAP and VALUE.
> > > > > > > > >
> > > > > > > > > > 3. I see there are two `RowData` stored for one record,
> one
> > > is
> > > > in
> > > > > > > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess
> > the
> > > > > first
> > > > > > is
> > > > > > > > for
> > > > > > > > > > upsert-keys. Would it be optimized to single copy for a
> > > > > > > non-upsert-key
> > > > > > > > > > scenario?
> > > > > > > > >
> > > > > > > > > That's an interesting idea! I'll try to dig into it deeper
> > when
> > > > > > > > > open-sourcing or as a follow-up.
> > > > > > > > >
> > > > > > > > > > 4. For the TTL mechanism part, I would suggest an
> > 'event-time
> > > > > based
> > > > > > > > ttl',
> > > > > > > > > > which allows the user to specify insertion time for each
> > > > > > > insert/update
> > > > > > > > > > operation and a manually controllable `TtlTimeProvider`
> > > > (instead
> > > > > of
> > > > > > > > just
> > > > > > > > > > system time). This would be beneficial for many cases,
> > WDYT?
> > > > > > > > >
> > > > > > > > > I agree, I think that event-time based TTL is more useful
> in
> > > > > general
> > > > > > > > > (I specified processing time as a default to make it less
> > > > > surprising
> > > > > > > for
> > > > > > > > > the users).
> > > > > > > > >
> > > > > > > > > I don't immediately see the potential usages of a manually
> > > > > > controllable
> > > > > > > > > TtlTimeProvider - do you have any use cases in mind?
> > > > > > > > >
> > > > > > > > > > 5. Does the current RocksDB benchmark involve significant
> > > state
> > > > > > size
> > > > > > > > and
> > > > > > > > > > I/O pressure?
> > > > > > > > >
> > > > > > > > > No, in the micro-benchmark the state wasn't too big (in
> order
> > > of
> > > > > > > > > megabytes);
> > > > > > > > > It was bottlenecked by RocksDB put/get operations, however.
> > > > > > > > > I also performed a benchmark on a cluster with a larger
> state
> > > > size
> > > > > > > > > (in order of gigabytes) and got similar results.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Roman
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <
> > > > > [email protected]>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Roman,
> > > > > > > > > >
> > > > > > > > > > Thanks for the proposal! The SinkUpsertMaterializer
> > sometimes
> > > > > > > becomes a
> > > > > > > > > > bottleneck in our production, so I'd +1 to optimize it. I
> > > have
> > > > > > > several
> > > > > > > > > > questions regarding your design:
> > > > > > > > > >
> > > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is
> the
> > > > > switch
> > > > > > > > > between
> > > > > > > > > > VALUE and MAP performed under each stream key considering
> > > each
> > > > > list
> > > > > > > > size,
> > > > > > > > > > or is it performed for all keys if the average list size
> > > > reaches
> > > > > > the
> > > > > > > > > given
> > > > > > > > > > thresholds?
> > > > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and
> > > > > 'nextSeqNo'
> > > > > > > to
> > > > > > > > > link
> > > > > > > > > > all the nodes? I assume there should be a traversal need
> > but
> > > I
> > > > > > don't
> > > > > > > > see
> > > > > > > > > > that in pseudo-code. And is `MapState.iterator` also
> > > feasible?
> > > > > > > > > > 3. I see there are two `RowData` stored for one record,
> one
> > > is
> > > > in
> > > > > > > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess
> > the
> > > > > first
> > > > > > is
> > > > > > > > for
> > > > > > > > > > upsert-keys. Would it be optimized to single copy for a
> > > > > > > non-upsert-key
> > > > > > > > > > scenario?
> > > > > > > > > > 4. For the TTL mechanism part, I would suggest an
> > 'event-time
> > > > > based
> > > > > > > > ttl',
> > > > > > > > > > which allows the user to specify insertion time for each
> > > > > > > insert/update
> > > > > > > > > > operation and a manually controllable `TtlTimeProvider`
> > > > (instead
> > > > > of
> > > > > > > > just
> > > > > > > > > > system time). This would be beneficial for many cases,
> > WDYT?
> > > > > > > > > > 5. Does the current RocksDB benchmark involve significant
> > > state
> > > > > > size
> > > > > > > > and
> > > > > > > > > > I/O pressure?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Zakelly
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <
> > > > > > [email protected]>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > I would like to start a discussion about FLIP-544
> > > > > > > > > SinkUpsertMaterializer
> > > > > > > > > > V2
> > > > > > > > > > > [1].
> > > > > > > > > > >
> > > > > > > > > > > SinkUpsertMaterializer is an operator in Flink that
> > > > reconciles
> > > > > > out
> > > > > > > of
> > > > > > > > > > order
> > > > > > > > > > > changelog events before sending them to an upsert sink.
> > In
> > > > some
> > > > > > > cases
> > > > > > > > > > (that
> > > > > > > > > > > we see in our production), performance of this operator
> > > > > degrades
> > > > > > > > > > > exponentially, depending on the input data.
> > > > > > > > > > > This FLIP proposes a new implementation that is
> optimized
> > > for
> > > > > > such
> > > > > > > > > cases
> > > > > > > > > > > and serves as a synchronization point for other efforts
> > in
> > > > that
> > > > > > > area.
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to feedback.
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Roman
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> >
>

Reply via email to