Thanks Roman for driving this. I highly support this effort. Big +1 from my side.
Best, Piotrek niedz., 31 sie 2025 o 21:58 Roman Khachatryan <[email protected]> napisał(a): > Hi Zakelly, > > > So I assume there is a conditional branch to determine whether current > > stream key is in VALUE or MAP mode, and this also involves some state > > access right? > > Yes, that's a good question; it might add one more state access. > To mitigate that to some extent, the results of the 1st access to > ValueState > can be cached and be used later for add/retract (in case it's not empty). > (such caching is already included in the benchmark results) > > > Have you evaluated the implementation relying on the orderliness > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the > first > > entry of the map in binary order. We could reverse the order of seq by > > generating it from Long.MAX to 0, and thus the first entry retrieved > would > > be the last one added. I see you mention this in 'Rejected Alternatives' > > but I'm still curious whether this could achieve an improvement. To > > my knowledge, the iterator might be less efficient since it could not > > leverage bloom filters as point-lookup does and the cache is unnecessary > if > > only the first entry is needed (of course we could remove that cache). > It's > > not immediately clear which approach is better, as each has its > trade-offs. > > We tried a similar approach of using an iterator. The benefits are > diminished there > by slow iteration (plus, iterator creation is also slow). Because of that, > the > performance was similar to the current implementation. We didn't compare > the > two approaches side by side though. > > > I’d also suggest testing scenarios with retraction rates below 100%, as > > that may better reflect real-world workloads IIUC. > > I mostly focused on 100% because that's where I saw regression: > the fewer retractions, the longer the list, the worse performance of > ValueState. > I'll re-run the benchmark with lower rates. > > > I think it's a good chance to push forward the discussion/design of > binary > > sorted map state (FLIP-220)[1] since this seems to be a good application > > scenario. But I also think it's acceptable if we do some hack to only > rely > > on RocksDB's state implicitly rather that waiting the new > > state primitives, if it is truly beneficial. > > I agree, probably not for this FLIP (because of the above reasons), but for > many other cases it would be definitely beneficial to expose the sortedness > of RocksDB in some way. Multi-way join (FLIP-516) [1] is one such example. > > > By saying event-time based TTL, I meant to make it easier for users to > > understand. The event-time could be defined and controlled/advanced by > user > > (SQL implementor or Flink user). e.g. Event-time could be watermark > > progression or just record sequence (state will live through specific > > number of records), or even be advanced by special control records for > > Datastream users. This kind of user-controlled time advancement is what I > > said "manually controllable". Such flexibility could be broadly > beneficial. > > Thanks for clarifying, special control records is an interesting idea, and > I think it should be easy to implement. > > > We’ve encountered cases where state expiration is inconsistent between > > upstream and downstream SQL operators. With event-time based TTL, > operators > > could share a synchronized notion of time, allowing them to expire state > in > > a more coordinated way. > > Yeah, that's a pity that Flink doesn't have event-time TTL. But to solve > cross-operator TTL inconsistency, we'd need to change multiple operators > (or > state backend). I'm not sure we can efficiently support event time TTL for > a > general case. > > P.S.: have a good vacation! :) > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator > > Regards, > Roman > > > On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan <[email protected]> wrote: > > > Hi Roman, > > > > Thanks for the answers! Please see my comments below: > > > > The switch is performed under each stream key individually, when > > > its specific list size > > > reaches a threshold. > > > > > > So I assume there is a conditional branch to determine whether current > > stream key is in VALUE or MAP mode, and this also involves some state > > access right? > > > > Yes, those pointers are necessary, I couldn't find a way to get rid of > > them: > > > - prevSeqNo is used to emit penultimate element when retracing the last > > > one; > > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item > > from > > > the middle > > > > > > Oh I see. Have you evaluated the implementation relying on the > orderliness > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the > first > > entry of the map in binary order. We could reverse the order of seq by > > generating it from Long.MAX to 0, and thus the first entry retrieved > would > > be the last one added. I see you mention this in 'Rejected Alternatives' > > but I'm still curious whether this could achieve an improvement. To > > my knowledge, the iterator might be less efficient since it could not > > leverage bloom filters as point-lookup does and the cache is unnecessary > if > > only the first entry is needed (of course we could remove that cache). > It's > > not immediately clear which approach is better, as each has its > trade-offs. > > I’d also suggest testing scenarios with retraction rates below 100%, as > > that may better reflect real-world workloads IIUC. > > > > I think it's a good chance to push forward the discussion/design of > binary > > sorted map state (FLIP-220)[1] since this seems to be a good application > > scenario. But I also think it's acceptable if we do some hack to only > rely > > on RocksDB's state implicitly rather that waiting the new > > state primitives, if it is truly beneficial. > > > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > > > > I agree, I think that event-time based TTL is more useful in general > > > (I specified processing time as a default to make it less surprising > for > > > the users). > > > > I don't immediately see the potential usages of a manually controllable > > > TtlTimeProvider - do you have any use cases in mind? > > > > > > By saying event-time based TTL, I meant to make it easier for users to > > understand. The event-time could be defined and controlled/advanced by > user > > (SQL implementor or Flink user). e.g. Event-time could be watermark > > progression or just record sequence (state will live through specific > > number of records), or even be advanced by special control records for > > Datastream users. This kind of user-controlled time advancement is what I > > said "manually controllable". Such flexibility could be broadly > beneficial. > > > > We’ve encountered cases where state expiration is inconsistent between > > upstream and downstream SQL operators. With event-time based TTL, > operators > > could share a synchronized notion of time, allowing them to expire state > in > > a more coordinated way. > > > > > > Looking forward to your reply! P.S. I'm on vacation for the next few > days, > > so I'll follow up later :) . > > > > Best, > > Zakelly > > > > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan <[email protected]> > > wrote: > > > > > Hi Zakelly, > > > > > > Thanks for the feedback! > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch > > > between > > > > VALUE and MAP performed under each stream key considering each list > > size, > > > > or is it performed for all keys if the average list size reaches the > > > given > > > > thresholds? > > > > > > The switch is performed under each stream key individually, when > > > its specific list size > > > reaches a threshold. > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' > to > > > link > > > > all the nodes? I assume there should be a traversal need but I don't > > see > > > > that in pseudo-code. > > > > > > Yes, those pointers are necessary, I couldn't find a way to get rid of > > > them: > > > - prevSeqNo is used to emit penultimate element when retracing the last > > > one; > > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item > > from > > > the middle > > > > > > > And is `MapState.iterator` also feasible? > > > Yes, in fact, the ADAPTIVE strategy uses an iterator to move the > entries > > > between MAP and VALUE. > > > > > > > 3. I see there are two `RowData` stored for one record, one is in > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is > > for > > > > upsert-keys. Would it be optimized to single copy for a > non-upsert-key > > > > scenario? > > > > > > That's an interesting idea! I'll try to dig into it deeper when > > > open-sourcing or as a follow-up. > > > > > > > 4. For the TTL mechanism part, I would suggest an 'event-time based > > ttl', > > > > which allows the user to specify insertion time for each > insert/update > > > > operation and a manually controllable `TtlTimeProvider` (instead of > > just > > > > system time). This would be beneficial for many cases, WDYT? > > > > > > I agree, I think that event-time based TTL is more useful in general > > > (I specified processing time as a default to make it less surprising > for > > > the users). > > > > > > I don't immediately see the potential usages of a manually controllable > > > TtlTimeProvider - do you have any use cases in mind? > > > > > > > 5. Does the current RocksDB benchmark involve significant state size > > and > > > > I/O pressure? > > > > > > No, in the micro-benchmark the state wasn't too big (in order of > > > megabytes); > > > It was bottlenecked by RocksDB put/get operations, however. > > > I also performed a benchmark on a cluster with a larger state size > > > (in order of gigabytes) and got similar results. > > > > > > > > > Regards, > > > Roman > > > > > > > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <[email protected]> > > > wrote: > > > > > > > Hi Roman, > > > > > > > > Thanks for the proposal! The SinkUpsertMaterializer sometimes > becomes a > > > > bottleneck in our production, so I'd +1 to optimize it. I have > several > > > > questions regarding your design: > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch > > > between > > > > VALUE and MAP performed under each stream key considering each list > > size, > > > > or is it performed for all keys if the average list size reaches the > > > given > > > > thresholds? > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' > to > > > link > > > > all the nodes? I assume there should be a traversal need but I don't > > see > > > > that in pseudo-code. And is `MapState.iterator` also feasible? > > > > 3. I see there are two `RowData` stored for one record, one is in > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is > > for > > > > upsert-keys. Would it be optimized to single copy for a > non-upsert-key > > > > scenario? > > > > 4. For the TTL mechanism part, I would suggest an 'event-time based > > ttl', > > > > which allows the user to specify insertion time for each > insert/update > > > > operation and a manually controllable `TtlTimeProvider` (instead of > > just > > > > system time). This would be beneficial for many cases, WDYT? > > > > 5. Does the current RocksDB benchmark involve significant state size > > and > > > > I/O pressure? > > > > > > > > > > > > Best, > > > > Zakelly > > > > > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <[email protected]> > > > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > I would like to start a discussion about FLIP-544 > > > SinkUpsertMaterializer > > > > V2 > > > > > [1]. > > > > > > > > > > SinkUpsertMaterializer is an operator in Flink that reconciles out > of > > > > order > > > > > changelog events before sending them to an upsert sink. In some > cases > > > > (that > > > > > we see in our production), performance of this operator degrades > > > > > exponentially, depending on the input data. > > > > > This FLIP proposes a new implementation that is optimized for such > > > cases > > > > > and serves as a synchronization point for other efforts in that > area. > > > > > > > > > > Looking forward to feedback. > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > > > > > > > > > > > > > Regards, > > > > > Roman > > > > > > > > > > > > > > >
