Thanks everyone for the feedback! We've discussed the configuration offline with Rui and Zakelly and agreed to keep the options and mark them @Experimental; and to choose the defaults based on state backend type. For that, KeyedStateStore interface needs to be extended. I've updated the FLIP to reflect this.
I'm going to start a voting thread soon unless there are other suggestions or objections. Regards, Roman On Fri, Sep 12, 2025, 14:21 Lincoln Lee <[email protected]> wrote: > +1 to the optimization plan and mark the new options as experimental. > > Optimizing state access after data expansion will help ease performance > bottlenecks when this operator can’t be fully avoided. > > Thanks for pushing this forward! > > Best, > Lincoln Lee > > > Roman Khachatryan <[email protected]> 于2025年9月12日周五 16:38写道: > > > Hey Rui, > > > > Thanks for clarifying > > > > In my opinion, the flexibility is necessary here for a number of reasons > > (e.g. payload size may vary per job, TM disk cache size is different, > even > > user priorities might be different). > > > > We can mark these options as @Experimental for now to be able to change > > them in the future. And/or, the improved switching algorithm can be just > a > > new strategy, without the need to touch these options. > > > > > I don't have a strong opinion on this, if it's really necessary to > retain > > flexibility. > > > > In that case, I'd like to start the vote today or on Monday unless > there's > > any further feedback. > > > > Regards, > > Roman > > > > On Fri, Sep 12, 2025, 10:07 Rui Fan <[email protected]> wrote: > > > > > Hey Roman, > > > > > > Thanks for your explanation for the first 2 questions, it is clear for > > me! > > > > > > About the 3rd question: > > > > > > > I totally agree that configuration options add complexity and > increase > > > > maintenance burden, but in my opinion this is a trade-off between > > > > flexibility and complexity. > > > > > > Totally agree that it is a trade-off. > > > > > > > In this case, I don't see how the minimum flexibility can be achieved > > > > without adding those options. On the other hand, providing sensible > > > > defaults makes it unnecessary to adjust these settings for most of > the > > > > users. > > > > > > IIUC, it is hard to provide a unified sensible defaults since rocksdb > and > > > hashmap state backend expect different thresholds. > > > > > > > Could you elaborate what do you mean by list size and how is it > > different > > > > from threshold.high and threshold.low proposed in the FLIP? > > > > > > What I mean is that we do not expose the threshold.high and > threshold.low > > > configuration options, but hard-code them inside the flink code to > select > > > different thresholds depending on whether rocksdb or hashmap is used. > > > > > > Following is the pseudocode: > > > > > > ``` > > > private static final int ROCKSDB_HIGH_THRESHOLD = 50; > > > private static final int ROCKSDB_LOW_THRESHOLD = 40; > > > private static final int HASHMAP_HIGH_THRESHOLD = 400; > > > private static final int HASHMAP_LOW_THRESHOLD = 300; > > > > > > int highThreshold = isRocksdb ? > > > ROCKSDB_HIGH_THRESHOLD:HASHMAP_HIGH_THRESHOLD; > > > int lowThreshold = isRocksdb ? > > ROCKSDB_LOW_THRESHOLD:HASHMAP_LOW_THRESHOLD; > > > ``` > > > > > > In the short term, users lose flexibility, but the code and usability > are > > > simpler. > > > Most users do not need to adjust thresholds for HashMap and RocksDB. > > > > > > In the long term, if Flink wants to introduce other similar switching > > > strategies > > > other than thresholds, it can switch directly internally without > > > considering the > > > compatibility of configuration options. > > > > > > I don't have a strong opinion on this, if it's really necessary to > retain > > > flexibility. > > > > > > Best, > > > Rui > > > > > > On Thu, Sep 11, 2025 at 11:02 PM Roman Khachatryan <[email protected]> > > > wrote: > > > > > > > Hey Rui, > > > > > > > > Thanks for your feedback and questions! > > > > > > > > > 1. State Compatibility: > > > > > > > > > I would like to confirm if the underlying stored > > > > > data structure is the same in the following two scenarios: > > > > > > > > > - A ValueState stored in LEGACY mode. > > > > > - A ValueState stored in VALUE mode (as used by the ADAPTIVE > strategy > > > > > when below the threshold). > > > > > > > > > The FLIP mentions that VALUE - similar to LEGACY, but I do not > fully > > > know > > > > > what the difference is. If their physical storage format is > > identical, > > > it > > > > > seems > > > > > there might be an opportunity to provide state compatibility. > > > > > > > > No, in these two cases, the state is not compatible. This is because > > > > in VALUE mode, a timestamp is stored along with every record, plus > > > > metadata stores equalizer and hash function generators. > > > > > > > > The FLIP targets only new jobs. In the future, it is possible to add > > > > a migration path. The motivation is simplicity and reduction of risks > > > > (during state migration). > > > > > > > > > 2. Forward Compatibility for TTL > > > > > > > > > The FLIP mentions that TTL is a follow-up feature, that "storing > the > > > > > timestamps along with every record will be implemented from the get > > > go". > > > > > This implies a future state schema evolution. Could you elaborate > on > > > how > > > > > the V1 serializer will be designed to handle this, ensuring that > > > adopting > > > > > V1 > > > > > of this feature won't require another state migration when TTL > > support > > > is > > > > > added in V2? > > > > > > > > There's no migration necessary to add TTL support for V2: the > > timestamps > > > > will already be there, and there's no need for extra data. > > > > For V1 (the current implementation), a migration path would be > needed. > > > > > > > > > 3. Adaptive Threshold Configuration > > > > > > > > > FLIP introduces adaptive.threshold.high and adaptive.threshold.low. > > > > > This adds a configuration burden on users. To set these values > > > correctly, > > > > > users need to understand the underlying performance > characteristics. > > > > > For example, the FLIP recommends very different thresholds for > > RocksDB > > > > > and Heap backends, which highlights this complexity. > > > > > > > > > Furthermore, from historical experience, it is easy to expose a new > > > > > configuration, > > > > > but much harder to change or remove it later due to user > > compatibility > > > > > concerns. > > > > > If we don't expose these parameters now, it would allow the > internal > > > > > adaptive > > > > > strategy to be improved in the future without any compatibility > > issues. > > > > > > > > I totally agree that configuration options add complexity and > increase > > > > maintenance burden, but in my opinion this is a trade-off between > > > > flexibility and complexity. > > > > In this case, I don't see how the minimum flexibility can be achieved > > > > without adding those options. On the other hand, providing sensible > > > > defaults makes it unnecessary to adjust these settings for most of > the > > > > users. > > > > > > > > > In the first version, I think list size might be enough as an > > internal > > > > > strategy, > > > > > and we can dynamically choose different thresholds for heap and > > > rocksdb. > > > > > > > > Could you elaborate what do you mean by list size and how is it > > different > > > > from threshold.high and threshold.low proposed in the FLIP? > > > > > > > > Regards, > > > > Roman > > > > > > > > > > > > On Wed, Sep 3, 2025 at 12:32 PM Rui Fan <[email protected]> > wrote: > > > > > > > > > Hey all, > > > > > > > > > > Thanks for driving this great proposal and valuable discussion! > > > > > I have a few questions related to compatibility and usability. > > > > > > > > > > 1. State Compatibility: > > > > > > > > > > The compatibility of state between the old and new modes, which is > > > > crucial > > > > > for migrating existing jobs. I would like to confirm if the > > underlying > > > > > stored > > > > > data structure is the same in the following two scenarios: > > > > > > > > > > - A ValueState stored in LEGACY mode. > > > > > - A ValueState stored in VALUE mode (as used by the ADAPTIVE > strategy > > > > > when below the threshold). > > > > > > > > > > The FLIP mentions that VALUE - similar to LEGACY, but I do not > fully > > > know > > > > > what the difference is. If their physical storage format is > > identical, > > > it > > > > > seems > > > > > there might be an opportunity to provide state compatibility. > > > > > > > > > > 2. Forward Compatibility for TTL > > > > > > > > > > The FLIP mentions that TTL is a follow-up feature, that "storing > the > > > > > timestamps along with every record will be implemented from the get > > > go". > > > > > This implies a future state schema evolution. Could you elaborate > on > > > how > > > > > the V1 serializer will be designed to handle this, ensuring that > > > adopting > > > > > V1 > > > > > of this feature won't require another state migration when TTL > > support > > > is > > > > > added in V2? > > > > > > > > > > 3. Adaptive Threshold Configuration > > > > > > > > > > FLIP introduces adaptive.threshold.high and adaptive.threshold.low. > > > > > This adds a configuration burden on users. To set these values > > > correctly, > > > > > users need to understand the underlying performance > characteristics. > > > > > For example, the FLIP recommends very different thresholds for > > RocksDB > > > > > and Heap backends, which highlights this complexity. > > > > > > > > > > Furthermore, from historical experience, it is easy to expose a new > > > > > configuration, > > > > > but much harder to change or remove it later due to user > > compatibility > > > > > concerns. > > > > > If we don't expose these parameters now, it would allow the > internal > > > > > adaptive > > > > > strategy to be improved in the future without any compatibility > > issues. > > > > > > > > > > For example, future policies could be based not only on list length > > but > > > > > also on the > > > > > size of RowData in their cost models. Furthermore, operators could > > even > > > > > monitor > > > > > the time it takes to access state when processing a specific key, > > > > > triggering a mode > > > > > switch only when latency exceeds a dynamic baseline. This not only > > > makes > > > > > operators truly "adaptive" but also greatly simplifies user > > > > configuration. > > > > > > > > > > In the first version, I think list size might be enough as an > > internal > > > > > strategy, > > > > > and we can dynamically choose different thresholds for heap and > > > rocksdb. > > > > > > > > > > Looking forward to your feedback! > > > > > > > > > > Best, > > > > > Rui > > > > > > > > > > On Mon, Sep 1, 2025 at 3:13 PM Piotr Nowojski < > [email protected]> > > > > > wrote: > > > > > > > > > > > Thanks Roman for driving this. I highly support this effort. > > > > > > > > > > > > Big +1 from my side. > > > > > > > > > > > > Best, > > > > > > Piotrek > > > > > > > > > > > > niedz., 31 sie 2025 o 21:58 Roman Khachatryan <[email protected]> > > > > > > napisał(a): > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > So I assume there is a conditional branch to determine > whether > > > > > current > > > > > > > > stream key is in VALUE or MAP mode, and this also involves > some > > > > state > > > > > > > > access right? > > > > > > > > > > > > > > Yes, that's a good question; it might add one more state > access. > > > > > > > To mitigate that to some extent, the results of the 1st access > to > > > > > > > ValueState > > > > > > > can be cached and be used later for add/retract (in case it's > not > > > > > empty). > > > > > > > (such caching is already included in the benchmark results) > > > > > > > > > > > > > > > Have you evaluated the implementation relying on the > > orderliness > > > > > > > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` > > retrieves > > > > the > > > > > > > first > > > > > > > > entry of the map in binary order. We could reverse the order > of > > > seq > > > > > by > > > > > > > > generating it from Long.MAX to 0, and thus the first entry > > > > retrieved > > > > > > > would > > > > > > > > be the last one added. I see you mention this in 'Rejected > > > > > > Alternatives' > > > > > > > > but I'm still curious whether this could achieve an > > improvement. > > > To > > > > > > > > my knowledge, the iterator might be less efficient since it > > could > > > > not > > > > > > > > leverage bloom filters as point-lookup does and the cache is > > > > > > unnecessary > > > > > > > if > > > > > > > > only the first entry is needed (of course we could remove > that > > > > > cache). > > > > > > > It's > > > > > > > > not immediately clear which approach is better, as each has > its > > > > > > > trade-offs. > > > > > > > > > > > > > > We tried a similar approach of using an iterator. The benefits > > are > > > > > > > diminished there > > > > > > > by slow iteration (plus, iterator creation is also slow). > Because > > > of > > > > > > that, > > > > > > > the > > > > > > > performance was similar to the current implementation. We > didn't > > > > > compare > > > > > > > the > > > > > > > two approaches side by side though. > > > > > > > > > > > > > > > I’d also suggest testing scenarios with retraction rates > below > > > > 100%, > > > > > as > > > > > > > > that may better reflect real-world workloads IIUC. > > > > > > > > > > > > > > I mostly focused on 100% because that's where I saw regression: > > > > > > > the fewer retractions, the longer the list, the worse > performance > > > of > > > > > > > ValueState. > > > > > > > I'll re-run the benchmark with lower rates. > > > > > > > > > > > > > > > I think it's a good chance to push forward the > > discussion/design > > > of > > > > > > > binary > > > > > > > > sorted map state (FLIP-220)[1] since this seems to be a good > > > > > > application > > > > > > > > scenario. But I also think it's acceptable if we do some hack > > to > > > > only > > > > > > > rely > > > > > > > > on RocksDB's state implicitly rather that waiting the new > > > > > > > > state primitives, if it is truly beneficial. > > > > > > > > > > > > > > I agree, probably not for this FLIP (because of the above > > reasons), > > > > but > > > > > > for > > > > > > > many other cases it would be definitely beneficial to expose > the > > > > > > sortedness > > > > > > > of RocksDB in some way. Multi-way join (FLIP-516) [1] is one > such > > > > > > example. > > > > > > > > > > > > > > > By saying event-time based TTL, I meant to make it easier for > > > users > > > > > to > > > > > > > > understand. The event-time could be defined and > > > controlled/advanced > > > > > by > > > > > > > user > > > > > > > > (SQL implementor or Flink user). e.g. Event-time could be > > > watermark > > > > > > > > progression or just record sequence (state will live through > > > > specific > > > > > > > > number of records), or even be advanced by special control > > > records > > > > > for > > > > > > > > Datastream users. This kind of user-controlled time > advancement > > > is > > > > > > what I > > > > > > > > said "manually controllable". Such flexibility could be > broadly > > > > > > > beneficial. > > > > > > > > > > > > > > Thanks for clarifying, special control records is an > interesting > > > > idea, > > > > > > and > > > > > > > I think it should be easy to implement. > > > > > > > > > > > > > > > We’ve encountered cases where state expiration is > inconsistent > > > > > between > > > > > > > > upstream and downstream SQL operators. With event-time based > > TTL, > > > > > > > operators > > > > > > > > could share a synchronized notion of time, allowing them to > > > expire > > > > > > state > > > > > > > in > > > > > > > > a more coordinated way. > > > > > > > > > > > > > > Yeah, that's a pity that Flink doesn't have event-time TTL. But > > to > > > > > solve > > > > > > > cross-operator TTL inconsistency, we'd need to change multiple > > > > > operators > > > > > > > (or > > > > > > > state backend). I'm not sure we can efficiently support event > > time > > > > TTL > > > > > > for > > > > > > > a > > > > > > > general case. > > > > > > > > > > > > > > P.S.: have a good vacation! :) > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator > > > > > > > > > > > > > > Regards, > > > > > > > Roman > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Roman, > > > > > > > > > > > > > > > > Thanks for the answers! Please see my comments below: > > > > > > > > > > > > > > > > The switch is performed under each stream key individually, > > when > > > > > > > > > its specific list size > > > > > > > > > reaches a threshold. > > > > > > > > > > > > > > > > > > > > > > > > So I assume there is a conditional branch to determine > whether > > > > > current > > > > > > > > stream key is in VALUE or MAP mode, and this also involves > some > > > > state > > > > > > > > access right? > > > > > > > > > > > > > > > > Yes, those pointers are necessary, I couldn't find a way to > get > > > rid > > > > > of > > > > > > > > them: > > > > > > > > > - prevSeqNo is used to emit penultimate element when > > retracing > > > > the > > > > > > last > > > > > > > > > one; > > > > > > > > > - nextSeqNo is used to keep prevSeqNo correct when > retracting > > > an > > > > > item > > > > > > > > from > > > > > > > > > the middle > > > > > > > > > > > > > > > > > > > > > > > > Oh I see. Have you evaluated the implementation relying on > the > > > > > > > orderliness > > > > > > > > of RocksDB's kv? RocksDB's `MapState.iterator().next()` > > retrieves > > > > the > > > > > > > first > > > > > > > > entry of the map in binary order. We could reverse the order > of > > > seq > > > > > by > > > > > > > > generating it from Long.MAX to 0, and thus the first entry > > > > retrieved > > > > > > > would > > > > > > > > be the last one added. I see you mention this in 'Rejected > > > > > > Alternatives' > > > > > > > > but I'm still curious whether this could achieve an > > improvement. > > > To > > > > > > > > my knowledge, the iterator might be less efficient since it > > could > > > > not > > > > > > > > leverage bloom filters as point-lookup does and the cache is > > > > > > unnecessary > > > > > > > if > > > > > > > > only the first entry is needed (of course we could remove > that > > > > > cache). > > > > > > > It's > > > > > > > > not immediately clear which approach is better, as each has > its > > > > > > > trade-offs. > > > > > > > > I’d also suggest testing scenarios with retraction rates > below > > > > 100%, > > > > > as > > > > > > > > that may better reflect real-world workloads IIUC. > > > > > > > > > > > > > > > > I think it's a good chance to push forward the > > discussion/design > > > of > > > > > > > binary > > > > > > > > sorted map state (FLIP-220)[1] since this seems to be a good > > > > > > application > > > > > > > > scenario. But I also think it's acceptable if we do some hack > > to > > > > only > > > > > > > rely > > > > > > > > on RocksDB's state implicitly rather that waiting the new > > > > > > > > state primitives, if it is truly beneficial. > > > > > > > > > > > > > > > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > > > > > > > > > > > > > > > > I agree, I think that event-time based TTL is more useful in > > > > general > > > > > > > > > (I specified processing time as a default to make it less > > > > > surprising > > > > > > > for > > > > > > > > > the users). > > > > > > > > > > > > > > > > I don't immediately see the potential usages of a manually > > > > > controllable > > > > > > > > > TtlTimeProvider - do you have any use cases in mind? > > > > > > > > > > > > > > > > > > > > > > > > By saying event-time based TTL, I meant to make it easier for > > > users > > > > > to > > > > > > > > understand. The event-time could be defined and > > > controlled/advanced > > > > > by > > > > > > > user > > > > > > > > (SQL implementor or Flink user). e.g. Event-time could be > > > watermark > > > > > > > > progression or just record sequence (state will live through > > > > specific > > > > > > > > number of records), or even be advanced by special control > > > records > > > > > for > > > > > > > > Datastream users. This kind of user-controlled time > advancement > > > is > > > > > > what I > > > > > > > > said "manually controllable". Such flexibility could be > broadly > > > > > > > beneficial. > > > > > > > > > > > > > > > > We’ve encountered cases where state expiration is > inconsistent > > > > > between > > > > > > > > upstream and downstream SQL operators. With event-time based > > TTL, > > > > > > > operators > > > > > > > > could share a synchronized notion of time, allowing them to > > > expire > > > > > > state > > > > > > > in > > > > > > > > a more coordinated way. > > > > > > > > > > > > > > > > > > > > > > > > Looking forward to your reply! P.S. I'm on vacation for the > > next > > > > few > > > > > > > days, > > > > > > > > so I'll follow up later :) . > > > > > > > > > > > > > > > > Best, > > > > > > > > Zakelly > > > > > > > > > > > > > > > > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan < > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > Thanks for the feedback! > > > > > > > > > > > > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is > the > > > > > switch > > > > > > > > > between > > > > > > > > > > VALUE and MAP performed under each stream key considering > > > each > > > > > list > > > > > > > > size, > > > > > > > > > > or is it performed for all keys if the average list size > > > > reaches > > > > > > the > > > > > > > > > given > > > > > > > > > > thresholds? > > > > > > > > > > > > > > > > > > The switch is performed under each stream key individually, > > > when > > > > > > > > > its specific list size > > > > > > > > > reaches a threshold. > > > > > > > > > > > > > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and > > > > > 'nextSeqNo' > > > > > > > to > > > > > > > > > link > > > > > > > > > > all the nodes? I assume there should be a traversal need > > but > > > I > > > > > > don't > > > > > > > > see > > > > > > > > > > that in pseudo-code. > > > > > > > > > > > > > > > > > > Yes, those pointers are necessary, I couldn't find a way to > > get > > > > rid > > > > > > of > > > > > > > > > them: > > > > > > > > > - prevSeqNo is used to emit penultimate element when > > retracing > > > > the > > > > > > last > > > > > > > > > one; > > > > > > > > > - nextSeqNo is used to keep prevSeqNo correct when > retracting > > > an > > > > > item > > > > > > > > from > > > > > > > > > the middle > > > > > > > > > > > > > > > > > > > And is `MapState.iterator` also feasible? > > > > > > > > > Yes, in fact, the ADAPTIVE strategy uses an iterator to > move > > > the > > > > > > > entries > > > > > > > > > between MAP and VALUE. > > > > > > > > > > > > > > > > > > > 3. I see there are two `RowData` stored for one record, > one > > > is > > > > in > > > > > > > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess > > the > > > > > first > > > > > > is > > > > > > > > for > > > > > > > > > > upsert-keys. Would it be optimized to single copy for a > > > > > > > non-upsert-key > > > > > > > > > > scenario? > > > > > > > > > > > > > > > > > > That's an interesting idea! I'll try to dig into it deeper > > when > > > > > > > > > open-sourcing or as a follow-up. > > > > > > > > > > > > > > > > > > > 4. For the TTL mechanism part, I would suggest an > > 'event-time > > > > > based > > > > > > > > ttl', > > > > > > > > > > which allows the user to specify insertion time for each > > > > > > > insert/update > > > > > > > > > > operation and a manually controllable `TtlTimeProvider` > > > > (instead > > > > > of > > > > > > > > just > > > > > > > > > > system time). This would be beneficial for many cases, > > WDYT? > > > > > > > > > > > > > > > > > > I agree, I think that event-time based TTL is more useful > in > > > > > general > > > > > > > > > (I specified processing time as a default to make it less > > > > > surprising > > > > > > > for > > > > > > > > > the users). > > > > > > > > > > > > > > > > > > I don't immediately see the potential usages of a manually > > > > > > controllable > > > > > > > > > TtlTimeProvider - do you have any use cases in mind? > > > > > > > > > > > > > > > > > > > 5. Does the current RocksDB benchmark involve significant > > > state > > > > > > size > > > > > > > > and > > > > > > > > > > I/O pressure? > > > > > > > > > > > > > > > > > > No, in the micro-benchmark the state wasn't too big (in > order > > > of > > > > > > > > > megabytes); > > > > > > > > > It was bottlenecked by RocksDB put/get operations, however. > > > > > > > > > I also performed a benchmark on a cluster with a larger > state > > > > size > > > > > > > > > (in order of gigabytes) and got similar results. > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Roman > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan < > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Roman, > > > > > > > > > > > > > > > > > > > > Thanks for the proposal! The SinkUpsertMaterializer > > sometimes > > > > > > > becomes a > > > > > > > > > > bottleneck in our production, so I'd +1 to optimize it. I > > > have > > > > > > > several > > > > > > > > > > questions regarding your design: > > > > > > > > > > > > > > > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is > the > > > > > switch > > > > > > > > > between > > > > > > > > > > VALUE and MAP performed under each stream key considering > > > each > > > > > list > > > > > > > > size, > > > > > > > > > > or is it performed for all keys if the average list size > > > > reaches > > > > > > the > > > > > > > > > given > > > > > > > > > > thresholds? > > > > > > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and > > > > > 'nextSeqNo' > > > > > > > to > > > > > > > > > link > > > > > > > > > > all the nodes? I assume there should be a traversal need > > but > > > I > > > > > > don't > > > > > > > > see > > > > > > > > > > that in pseudo-code. And is `MapState.iterator` also > > > feasible? > > > > > > > > > > 3. I see there are two `RowData` stored for one record, > one > > > is > > > > in > > > > > > > > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess > > the > > > > > first > > > > > > is > > > > > > > > for > > > > > > > > > > upsert-keys. Would it be optimized to single copy for a > > > > > > > non-upsert-key > > > > > > > > > > scenario? > > > > > > > > > > 4. For the TTL mechanism part, I would suggest an > > 'event-time > > > > > based > > > > > > > > ttl', > > > > > > > > > > which allows the user to specify insertion time for each > > > > > > > insert/update > > > > > > > > > > operation and a manually controllable `TtlTimeProvider` > > > > (instead > > > > > of > > > > > > > > just > > > > > > > > > > system time). This would be beneficial for many cases, > > WDYT? > > > > > > > > > > 5. Does the current RocksDB benchmark involve significant > > > state > > > > > > size > > > > > > > > and > > > > > > > > > > I/O pressure? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan < > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > I would like to start a discussion about FLIP-544 > > > > > > > > > SinkUpsertMaterializer > > > > > > > > > > V2 > > > > > > > > > > > [1]. > > > > > > > > > > > > > > > > > > > > > > SinkUpsertMaterializer is an operator in Flink that > > > > reconciles > > > > > > out > > > > > > > of > > > > > > > > > > order > > > > > > > > > > > changelog events before sending them to an upsert sink. > > In > > > > some > > > > > > > cases > > > > > > > > > > (that > > > > > > > > > > > we see in our production), performance of this operator > > > > > degrades > > > > > > > > > > > exponentially, depending on the input data. > > > > > > > > > > > This FLIP proposes a new implementation that is > optimized > > > for > > > > > > such > > > > > > > > > cases > > > > > > > > > > > and serves as a synchronization point for other efforts > > in > > > > that > > > > > > > area. > > > > > > > > > > > > > > > > > > > > > > Looking forward to feedback. > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > Roman > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
