Thanks Roman for driving this. I highly support this effort.

Big +1 from my side.

Best,
Piotrek

niedz., 31 sie 2025 o 21:58 Roman Khachatryan <[email protected]> napisał(a):

> Hi Zakelly,
>
> > So I assume there is a conditional branch to determine whether current
> > stream key is in VALUE or MAP mode, and this also involves some state
> > access right?
>
> Yes, that's a good question; it might add one more state access.
> To mitigate that to some extent, the results of the 1st access to
> ValueState
> can be cached and be used later for add/retract (in case it's not empty).
> (such caching is already included in the benchmark results)
>
> > Have you evaluated the implementation relying on the orderliness
> > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the
> first
> > entry of the map in binary order. We could reverse the order of seq by
> > generating it from Long.MAX to 0, and thus the first entry retrieved
> would
> > be the last one added. I see you mention this in 'Rejected Alternatives'
> > but I'm still curious whether this could achieve an improvement. To
> > my knowledge, the iterator might be less efficient since it could not
> > leverage bloom filters as point-lookup does and the cache is unnecessary
> if
> > only the first entry is needed (of course we could remove that cache).
> It's
> > not immediately clear which approach is better, as each has its
> trade-offs.
>
> We tried a similar approach of using an iterator. The benefits are
> diminished there
> by slow iteration (plus, iterator creation is also slow). Because of that,
> the
> performance was similar to the current implementation. We didn't compare
> the
> two approaches side by side though.
>
> > I’d also suggest testing scenarios with retraction rates below 100%, as
> > that may better reflect real-world workloads IIUC.
>
> I mostly focused on 100% because that's where I saw regression:
> the fewer retractions, the longer the list, the worse performance of
> ValueState.
> I'll re-run the benchmark with lower rates.
>
> > I think it's a good chance to push forward the discussion/design of
> binary
> > sorted map state (FLIP-220)[1] since this seems to be a good application
> > scenario. But I also think it's acceptable if we do some hack to only
> rely
> > on RocksDB's state implicitly rather that waiting the new
> > state primitives, if it is truly beneficial.
>
> I agree, probably not for this FLIP (because of the above reasons), but for
> many other cases it would be definitely beneficial to expose the sortedness
> of RocksDB in some way. Multi-way join (FLIP-516) [1] is one such example.
>
> > By saying event-time based TTL, I meant to make it easier for users to
> > understand. The event-time could be defined and controlled/advanced by
> user
> > (SQL implementor or Flink user). e.g. Event-time could be watermark
> > progression or just record sequence (state will live through specific
> > number of records), or even be advanced by special control records for
> > Datastream users. This kind of user-controlled time advancement is what I
> > said "manually controllable". Such flexibility could be broadly
> beneficial.
>
> Thanks for clarifying, special control records is an interesting idea, and
> I think it should be easy to implement.
>
> > We’ve encountered cases where state expiration is inconsistent between
> > upstream and downstream SQL operators. With event-time based TTL,
> operators
> > could share a synchronized notion of time, allowing them to expire state
> in
> > a more coordinated way.
>
> Yeah, that's a pity that Flink doesn't have event-time TTL. But to solve
> cross-operator TTL inconsistency, we'd need to change multiple operators
> (or
> state backend). I'm not sure we can efficiently support event time TTL for
> a
> general case.
>
> P.S.: have a good vacation! :)
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator
>
> Regards,
> Roman
>
>
> On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan <[email protected]> wrote:
>
> > Hi Roman,
> >
> > Thanks for the answers! Please see my comments below:
> >
> > The switch is performed under each stream key individually, when
> > > its specific list size
> > > reaches a threshold.
> >
> >
> > So I assume there is a conditional branch to determine whether current
> > stream key is in VALUE or MAP mode, and this also involves some state
> > access right?
> >
> > Yes, those pointers are necessary, I couldn't find a way to get rid of
> > them:
> > > - prevSeqNo is used to emit penultimate element when retracing the last
> > > one;
> > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item
> > from
> > > the middle
> >
> >
> > Oh I see. Have you evaluated the implementation relying on the
> orderliness
> > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the
> first
> > entry of the map in binary order. We could reverse the order of seq by
> > generating it from Long.MAX to 0, and thus the first entry retrieved
> would
> > be the last one added. I see you mention this in 'Rejected Alternatives'
> > but I'm still curious whether this could achieve an improvement. To
> > my knowledge, the iterator might be less efficient since it could not
> > leverage bloom filters as point-lookup does and the cache is unnecessary
> if
> > only the first entry is needed (of course we could remove that cache).
> It's
> > not immediately clear which approach is better, as each has its
> trade-offs.
> > I’d also suggest testing scenarios with retraction rates below 100%, as
> > that may better reflect real-world workloads IIUC.
> >
> > I think it's a good chance to push forward the discussion/design of
> binary
> > sorted map state (FLIP-220)[1] since this seems to be a good application
> > scenario. But I also think it's acceptable if we do some hack to only
> rely
> > on RocksDB's state implicitly rather that waiting the new
> > state primitives, if it is truly beneficial.
> >
> > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> >
> > I agree, I think that event-time based TTL is more useful in general
> > > (I specified processing time as a default to make it less surprising
> for
> > > the users).
> >
> > I don't immediately see the potential usages of a manually controllable
> > > TtlTimeProvider - do you have any use cases in mind?
> >
> >
> > By saying event-time based TTL, I meant to make it easier for users to
> > understand. The event-time could be defined and controlled/advanced by
> user
> > (SQL implementor or Flink user). e.g. Event-time could be watermark
> > progression or just record sequence (state will live through specific
> > number of records), or even be advanced by special control records for
> > Datastream users. This kind of user-controlled time advancement is what I
> > said "manually controllable". Such flexibility could be broadly
> beneficial.
> >
> > We’ve encountered cases where state expiration is inconsistent between
> > upstream and downstream SQL operators. With event-time based TTL,
> operators
> > could share a synchronized notion of time, allowing them to expire state
> in
> > a more coordinated way.
> >
> >
> > Looking forward to your reply!  P.S. I'm on vacation for the next few
> days,
> > so I'll follow up later :) .
> >
> > Best,
> > Zakelly
> >
> > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan <[email protected]>
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for the feedback!
> > >
> > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch
> > > between
> > > > VALUE and MAP performed under each stream key considering each list
> > size,
> > > > or is it performed for all keys if the average list size reaches the
> > > given
> > > > thresholds?
> > >
> > > The switch is performed under each stream key individually, when
> > > its specific list size
> > > reaches a threshold.
> > >
> > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo'
> to
> > > link
> > > > all the nodes? I assume there should be a traversal need but I don't
> > see
> > > > that in pseudo-code.
> > >
> > > Yes, those pointers are necessary, I couldn't find a way to get rid of
> > > them:
> > > - prevSeqNo is used to emit penultimate element when retracing the last
> > > one;
> > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item
> > from
> > > the middle
> > >
> > > > And is `MapState.iterator` also feasible?
> > > Yes, in fact, the ADAPTIVE strategy uses an iterator to move the
> entries
> > > between MAP and VALUE.
> > >
> > > > 3. I see there are two `RowData` stored for one record, one is in
> > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is
> > for
> > > > upsert-keys. Would it be optimized to single copy for a
> non-upsert-key
> > > > scenario?
> > >
> > > That's an interesting idea! I'll try to dig into it deeper when
> > > open-sourcing or as a follow-up.
> > >
> > > > 4. For the TTL mechanism part, I would suggest an 'event-time based
> > ttl',
> > > > which allows the user to specify insertion time for each
> insert/update
> > > > operation and a manually controllable `TtlTimeProvider` (instead of
> > just
> > > > system time). This would be beneficial for many cases, WDYT?
> > >
> > > I agree, I think that event-time based TTL is more useful in general
> > > (I specified processing time as a default to make it less surprising
> for
> > > the users).
> > >
> > > I don't immediately see the potential usages of a manually controllable
> > > TtlTimeProvider - do you have any use cases in mind?
> > >
> > > > 5. Does the current RocksDB benchmark involve significant state size
> > and
> > > > I/O pressure?
> > >
> > > No, in the micro-benchmark the state wasn't too big (in order of
> > > megabytes);
> > > It was bottlenecked by RocksDB put/get operations, however.
> > > I also performed a benchmark on a cluster with a larger state size
> > > (in order of gigabytes) and got similar results.
> > >
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <[email protected]>
> > > wrote:
> > >
> > > > Hi Roman,
> > > >
> > > > Thanks for the proposal! The SinkUpsertMaterializer sometimes
> becomes a
> > > > bottleneck in our production, so I'd +1 to optimize it. I have
> several
> > > > questions regarding your design:
> > > >
> > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch
> > > between
> > > > VALUE and MAP performed under each stream key considering each list
> > size,
> > > > or is it performed for all keys if the average list size reaches the
> > > given
> > > > thresholds?
> > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo'
> to
> > > link
> > > > all the nodes? I assume there should be a traversal need but I don't
> > see
> > > > that in pseudo-code. And is `MapState.iterator` also feasible?
> > > > 3. I see there are two `RowData` stored for one record, one is in
> > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is
> > for
> > > > upsert-keys. Would it be optimized to single copy for a
> non-upsert-key
> > > > scenario?
> > > > 4. For the TTL mechanism part, I would suggest an 'event-time based
> > ttl',
> > > > which allows the user to specify insertion time for each
> insert/update
> > > > operation and a manually controllable `TtlTimeProvider` (instead of
> > just
> > > > system time). This would be beneficial for many cases, WDYT?
> > > > 5. Does the current RocksDB benchmark involve significant state size
> > and
> > > > I/O pressure?
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <[email protected]>
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > I would like to start a discussion about FLIP-544
> > > SinkUpsertMaterializer
> > > > V2
> > > > > [1].
> > > > >
> > > > > SinkUpsertMaterializer is an operator in Flink that reconciles out
> of
> > > > order
> > > > > changelog events before sending them to an upsert sink. In some
> cases
> > > > (that
> > > > > we see in our production), performance of this operator degrades
> > > > > exponentially, depending on the input data.
> > > > > This FLIP proposes a new implementation that is optimized for such
> > > cases
> > > > > and serves as a synchronization point for other efforts in that
> area.
> > > > >
> > > > > Looking forward to feedback.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2
> > > > >
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > >
> > >
> >
>

Reply via email to