Hey all,

Thanks for driving this great proposal and valuable discussion!
I have a few questions related to compatibility and usability.

1. State Compatibility:

The compatibility of state between the old and new modes, which is crucial
for migrating existing jobs. I would like to confirm if the underlying
stored
data structure is the same in the following two scenarios:

- A ValueState stored in LEGACY mode.
- A ValueState stored in VALUE mode (as used by the ADAPTIVE strategy
  when below the threshold).

The FLIP mentions that VALUE - similar to LEGACY, but I do not fully know
what the difference is. If their physical storage format is identical, it
seems
there might be an opportunity to provide state compatibility.

2. Forward Compatibility for TTL

The FLIP mentions that TTL is a follow-up feature, that "storing the
timestamps along with every record will be implemented from the get go".
This implies a future state schema evolution. Could you elaborate on how
the V1 serializer will be designed to handle this, ensuring that adopting
V1
of this feature won't require another state migration when TTL support is
added in V2?

3. Adaptive Threshold Configuration

FLIP introduces adaptive.threshold.high and adaptive.threshold.low.
This adds a configuration burden on users. To set these values correctly,
users need to understand the underlying performance characteristics.
For example, the FLIP recommends very different thresholds for RocksDB
and Heap backends, which highlights this complexity.

Furthermore, from historical experience, it is easy to expose a new
configuration,
but much harder to change or remove it later due to user compatibility
concerns.
If we don't expose these parameters now, it would allow the internal
adaptive
strategy to be improved in the future without any compatibility issues.

For example, future policies could be based not only on list length but
also on the
size of RowData in their cost models. Furthermore, operators could even
monitor
the time it takes to access state when processing a specific key,
triggering a mode
switch only when latency exceeds a dynamic baseline. This not only makes
operators truly "adaptive" but also greatly simplifies user configuration.

In the first version, I think list size might be enough as an internal
strategy,
and we can dynamically choose different thresholds for heap and rocksdb.

Looking forward to your feedback!

Best,
Rui

On Mon, Sep 1, 2025 at 3:13 PM Piotr Nowojski <[email protected]> wrote:

> Thanks Roman for driving this. I highly support this effort.
>
> Big +1 from my side.
>
> Best,
> Piotrek
>
> niedz., 31 sie 2025 o 21:58 Roman Khachatryan <[email protected]>
> napisał(a):
>
> > Hi Zakelly,
> >
> > > So I assume there is a conditional branch to determine whether current
> > > stream key is in VALUE or MAP mode, and this also involves some state
> > > access right?
> >
> > Yes, that's a good question; it might add one more state access.
> > To mitigate that to some extent, the results of the 1st access to
> > ValueState
> > can be cached and be used later for add/retract (in case it's not empty).
> > (such caching is already included in the benchmark results)
> >
> > > Have you evaluated the implementation relying on the orderliness
> > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the
> > first
> > > entry of the map in binary order. We could reverse the order of seq by
> > > generating it from Long.MAX to 0, and thus the first entry retrieved
> > would
> > > be the last one added. I see you mention this in 'Rejected
> Alternatives'
> > > but I'm still curious whether this could achieve an improvement. To
> > > my knowledge, the iterator might be less efficient since it could not
> > > leverage bloom filters as point-lookup does and the cache is
> unnecessary
> > if
> > > only the first entry is needed (of course we could remove that cache).
> > It's
> > > not immediately clear which approach is better, as each has its
> > trade-offs.
> >
> > We tried a similar approach of using an iterator. The benefits are
> > diminished there
> > by slow iteration (plus, iterator creation is also slow). Because of
> that,
> > the
> > performance was similar to the current implementation. We didn't compare
> > the
> > two approaches side by side though.
> >
> > > I’d also suggest testing scenarios with retraction rates below 100%, as
> > > that may better reflect real-world workloads IIUC.
> >
> > I mostly focused on 100% because that's where I saw regression:
> > the fewer retractions, the longer the list, the worse performance of
> > ValueState.
> > I'll re-run the benchmark with lower rates.
> >
> > > I think it's a good chance to push forward the discussion/design of
> > binary
> > > sorted map state (FLIP-220)[1] since this seems to be a good
> application
> > > scenario. But I also think it's acceptable if we do some hack to only
> > rely
> > > on RocksDB's state implicitly rather that waiting the new
> > > state primitives, if it is truly beneficial.
> >
> > I agree, probably not for this FLIP (because of the above reasons), but
> for
> > many other cases it would be definitely beneficial to expose the
> sortedness
> > of RocksDB in some way. Multi-way join (FLIP-516) [1] is one such
> example.
> >
> > > By saying event-time based TTL, I meant to make it easier for users to
> > > understand. The event-time could be defined and controlled/advanced by
> > user
> > > (SQL implementor or Flink user). e.g. Event-time could be watermark
> > > progression or just record sequence (state will live through specific
> > > number of records), or even be advanced by special control records for
> > > Datastream users. This kind of user-controlled time advancement is
> what I
> > > said "manually controllable". Such flexibility could be broadly
> > beneficial.
> >
> > Thanks for clarifying, special control records is an interesting idea,
> and
> > I think it should be easy to implement.
> >
> > > We’ve encountered cases where state expiration is inconsistent between
> > > upstream and downstream SQL operators. With event-time based TTL,
> > operators
> > > could share a synchronized notion of time, allowing them to expire
> state
> > in
> > > a more coordinated way.
> >
> > Yeah, that's a pity that Flink doesn't have event-time TTL. But to solve
> > cross-operator TTL inconsistency, we'd need to change multiple operators
> > (or
> > state backend). I'm not sure we can efficiently support event time TTL
> for
> > a
> > general case.
> >
> > P.S.: have a good vacation! :)
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan <[email protected]>
> wrote:
> >
> > > Hi Roman,
> > >
> > > Thanks for the answers! Please see my comments below:
> > >
> > > The switch is performed under each stream key individually, when
> > > > its specific list size
> > > > reaches a threshold.
> > >
> > >
> > > So I assume there is a conditional branch to determine whether current
> > > stream key is in VALUE or MAP mode, and this also involves some state
> > > access right?
> > >
> > > Yes, those pointers are necessary, I couldn't find a way to get rid of
> > > them:
> > > > - prevSeqNo is used to emit penultimate element when retracing the
> last
> > > > one;
> > > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item
> > > from
> > > > the middle
> > >
> > >
> > > Oh I see. Have you evaluated the implementation relying on the
> > orderliness
> > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the
> > first
> > > entry of the map in binary order. We could reverse the order of seq by
> > > generating it from Long.MAX to 0, and thus the first entry retrieved
> > would
> > > be the last one added. I see you mention this in 'Rejected
> Alternatives'
> > > but I'm still curious whether this could achieve an improvement. To
> > > my knowledge, the iterator might be less efficient since it could not
> > > leverage bloom filters as point-lookup does and the cache is
> unnecessary
> > if
> > > only the first entry is needed (of course we could remove that cache).
> > It's
> > > not immediately clear which approach is better, as each has its
> > trade-offs.
> > > I’d also suggest testing scenarios with retraction rates below 100%, as
> > > that may better reflect real-world workloads IIUC.
> > >
> > > I think it's a good chance to push forward the discussion/design of
> > binary
> > > sorted map state (FLIP-220)[1] since this seems to be a good
> application
> > > scenario. But I also think it's acceptable if we do some hack to only
> > rely
> > > on RocksDB's state implicitly rather that waiting the new
> > > state primitives, if it is truly beneficial.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> > >
> > > I agree, I think that event-time based TTL is more useful in general
> > > > (I specified processing time as a default to make it less surprising
> > for
> > > > the users).
> > >
> > > I don't immediately see the potential usages of a manually controllable
> > > > TtlTimeProvider - do you have any use cases in mind?
> > >
> > >
> > > By saying event-time based TTL, I meant to make it easier for users to
> > > understand. The event-time could be defined and controlled/advanced by
> > user
> > > (SQL implementor or Flink user). e.g. Event-time could be watermark
> > > progression or just record sequence (state will live through specific
> > > number of records), or even be advanced by special control records for
> > > Datastream users. This kind of user-controlled time advancement is
> what I
> > > said "manually controllable". Such flexibility could be broadly
> > beneficial.
> > >
> > > We’ve encountered cases where state expiration is inconsistent between
> > > upstream and downstream SQL operators. With event-time based TTL,
> > operators
> > > could share a synchronized notion of time, allowing them to expire
> state
> > in
> > > a more coordinated way.
> > >
> > >
> > > Looking forward to your reply!  P.S. I'm on vacation for the next few
> > days,
> > > so I'll follow up later :) .
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan <[email protected]>
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > Thanks for the feedback!
> > > >
> > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch
> > > > between
> > > > > VALUE and MAP performed under each stream key considering each list
> > > size,
> > > > > or is it performed for all keys if the average list size reaches
> the
> > > > given
> > > > > thresholds?
> > > >
> > > > The switch is performed under each stream key individually, when
> > > > its specific list size
> > > > reaches a threshold.
> > > >
> > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo'
> > to
> > > > link
> > > > > all the nodes? I assume there should be a traversal need but I
> don't
> > > see
> > > > > that in pseudo-code.
> > > >
> > > > Yes, those pointers are necessary, I couldn't find a way to get rid
> of
> > > > them:
> > > > - prevSeqNo is used to emit penultimate element when retracing the
> last
> > > > one;
> > > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item
> > > from
> > > > the middle
> > > >
> > > > > And is `MapState.iterator` also feasible?
> > > > Yes, in fact, the ADAPTIVE strategy uses an iterator to move the
> > entries
> > > > between MAP and VALUE.
> > > >
> > > > > 3. I see there are two `RowData` stored for one record, one is in
> > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first
> is
> > > for
> > > > > upsert-keys. Would it be optimized to single copy for a
> > non-upsert-key
> > > > > scenario?
> > > >
> > > > That's an interesting idea! I'll try to dig into it deeper when
> > > > open-sourcing or as a follow-up.
> > > >
> > > > > 4. For the TTL mechanism part, I would suggest an 'event-time based
> > > ttl',
> > > > > which allows the user to specify insertion time for each
> > insert/update
> > > > > operation and a manually controllable `TtlTimeProvider` (instead of
> > > just
> > > > > system time). This would be beneficial for many cases, WDYT?
> > > >
> > > > I agree, I think that event-time based TTL is more useful in general
> > > > (I specified processing time as a default to make it less surprising
> > for
> > > > the users).
> > > >
> > > > I don't immediately see the potential usages of a manually
> controllable
> > > > TtlTimeProvider - do you have any use cases in mind?
> > > >
> > > > > 5. Does the current RocksDB benchmark involve significant state
> size
> > > and
> > > > > I/O pressure?
> > > >
> > > > No, in the micro-benchmark the state wasn't too big (in order of
> > > > megabytes);
> > > > It was bottlenecked by RocksDB put/get operations, however.
> > > > I also performed a benchmark on a cluster with a larger state size
> > > > (in order of gigabytes) and got similar results.
> > > >
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > >
> > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <[email protected]>
> > > > wrote:
> > > >
> > > > > Hi Roman,
> > > > >
> > > > > Thanks for the proposal! The SinkUpsertMaterializer sometimes
> > becomes a
> > > > > bottleneck in our production, so I'd +1 to optimize it. I have
> > several
> > > > > questions regarding your design:
> > > > >
> > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch
> > > > between
> > > > > VALUE and MAP performed under each stream key considering each list
> > > size,
> > > > > or is it performed for all keys if the average list size reaches
> the
> > > > given
> > > > > thresholds?
> > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo'
> > to
> > > > link
> > > > > all the nodes? I assume there should be a traversal need but I
> don't
> > > see
> > > > > that in pseudo-code. And is `MapState.iterator` also feasible?
> > > > > 3. I see there are two `RowData` stored for one record, one is in
> > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first
> is
> > > for
> > > > > upsert-keys. Would it be optimized to single copy for a
> > non-upsert-key
> > > > > scenario?
> > > > > 4. For the TTL mechanism part, I would suggest an 'event-time based
> > > ttl',
> > > > > which allows the user to specify insertion time for each
> > insert/update
> > > > > operation and a manually controllable `TtlTimeProvider` (instead of
> > > just
> > > > > system time). This would be beneficial for many cases, WDYT?
> > > > > 5. Does the current RocksDB benchmark involve significant state
> size
> > > and
> > > > > I/O pressure?
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <
> [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I would like to start a discussion about FLIP-544
> > > > SinkUpsertMaterializer
> > > > > V2
> > > > > > [1].
> > > > > >
> > > > > > SinkUpsertMaterializer is an operator in Flink that reconciles
> out
> > of
> > > > > order
> > > > > > changelog events before sending them to an upsert sink. In some
> > cases
> > > > > (that
> > > > > > we see in our production), performance of this operator degrades
> > > > > > exponentially, depending on the input data.
> > > > > > This FLIP proposes a new implementation that is optimized for
> such
> > > > cases
> > > > > > and serves as a synchronization point for other efforts in that
> > area.
> > > > > >
> > > > > > Looking forward to feedback.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Roman
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to