Thank Piotrek for your valuable input! I will prepare the following FLIPs about faster checkpointing in the current async execution model and the new APIs. And I have added some brief description of this part in FLIP-423/424/425.
Regarding your concern: > My main concern here, is to prevent a situation where we end up with duplicate code base of the operators: - the current set of operators that are well behaving during checkpointing, > but are synchronous > - some set of async operators that will be miss-behaving during checkpoints > Yes, that's definitely what we should avoid. Let's thoroughly refine the checkpointing behavior before the SQL operator rework in milestone 2. Best, Zakelly On Wed, Mar 27, 2024 at 4:30 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi! > > Yes, after some long offline discussions we agreed to proceed as planned > here, but we should treat the current API as experimental. The issues are > that either we can not checkpoint lambdas as they are currently defined, > leading to problems caused by in-flight records draining under > backpressure. > > <side comment> > 6000 records that's not far off from the amount of records buffered in > network buffers for smaller parallelism back pressured jobs, and before > unaligned checkpoints were implemented, even such jobs were often seen with > checkpointing times exploding to tens of minutes/hours. > And special handling of watermarks wouldn't solve the problem - problem > might be caused by upstream window operator flushing records on watermark, > while downstream operator that has those 6000 in-flight records is also > backpressured. Then processing of the watermark in the upstream operator > will be tied to the downstream operator draining all of those in-flight > records. > </side comment> > > On the other hand checkpointing in-flight requests with the current API > would require serializing lambdas into the checkpointed state, which has > significant limitations on it's own: > - unable to upgrade (including bug fix) JRE > - problems when updating dependencies of the code inside the lambdas, if > the updated dependencies are not binary compatible > - no way to fix bugs inside the lambdas - users might get stuck in an > unrecoverable state > > After brainstorming couple of different options, we came up with a couple > of solutions to those issues, one that we liked the most looks like: > > ``` > public void open() { > // name (bid), in type, consumer > consumer("GET", Void.class, (v) -> { > return getState("wordcount").get(); > }); > consumer("UPDATE", Integer.class, (v) -> { > return getState("wordcount").update(v == null ? 1 : v + 1); > }); > consumer("OUT", Integer.class, (v) -> { > getContext().collect(v); > }); > } > public void processElement() { > do("GET").then("UPDATE").then("OUT"); > } > ``` > > Where "GET" "UPDATE" "OUT" are some uid's (`block-id`/`bid`). This way > users could declare upfront during the operator's startup what > method/function should handle a given `bid` in the current execution > attempt. When checkpointing in-flight async state requests, Flink would > store only the registered `bid`, not the serialised code itself. This would > avoid problems of serializing lambdas. > > However, to not postpone this effort we reached a consensus that we can > proceed with the current proposal and treat the currently proposed Async > API (without declaring code upfront) as experimental/PoC - a test bed for > the whole disaggregated state backend. Not intended to be widely used in > the code base. And in parallel in a follow up FLIP, we could discuss the > exact shape of the declarative async API that would be actually > checkpointable. My main concern here, is to prevent a situation where we > end up with duplicate code base of the operators: > - the current set of operators that are well behaving during checkpointing, > but are synchronous > - some set of async operators that will be miss-behaving during checkpoints > > We should really try to avoid this scenario. > > Best, > Piotrek > > śr., 27 mar 2024 o 05:06 Zakelly Lan <zakelly....@gmail.com> napisał(a): > > > Hi devs, > > > > It seems there is no more concern or suggestion for a while. We'd like to > > start voting recently. > > > > > > Best, > > Zakelly > > > > On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > > > Hi everyone, > > > > > > Piotr and I had a long discussion about the checkpoint duration issue. > We > > > think that the lambda serialization approach I proposed in last mail > may > > > bring in more issues, the most serious one is that users may not be > able > > to > > > modify their code in serialized lambda to perform a bug fix. > > > > > > But fortunately we found a possible solution. By introducing a set of > > > declarative APIs and a `declareProcess()` function that users should > > > implement in some newly introduced AbstractStreamOperator, we could get > > the > > > declaration of record processing in runtime, broken down to requests > and > > > callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem > > of > > > lambda (de)serialization and instead we retrieve callbacks every time > > > before a task runs. The next step is to provide an API allowing users > to > > > assign an unique id to each state request and callback, or > automatically > > > assign one by declaration order. Thus we can find the corresponding > > > callback in runtime for each restored state request based on the id, > then > > > the whole pipeline can be resumed. > > > > > > Note that all these above are internal and won't expose to average > users. > > > Exposing this on Stream APIs can be discussed later. I will prepare > > another > > > FLIP(s) describing the whole optimized checkpointing process, and in > the > > > meantime, we can proceed on current FLIPs. The new FLIP(s) are built on > > top > > > of current ones and we can achieve this incrementally. > > > > > > > > > Best, > > > Zakelly > > > > > > On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan <zakelly....@gmail.com> > > > wrote: > > > > > >> Hi Piotrek, > > >> > > >> Thanks for your comments! > > >> > > >> As we discussed off-line, you agreed that we can not checkpoint while > > some > > >>> records are in the middle of being > > >>> processed. That we would have to drain the in-progress records before > > >>> doing > > >>> the checkpoint. You also argued > > >>> that this is not a problem, because the size of this buffer can be > > >>> configured. > > >>> > > >>> I'm really afraid of such a solution. I've seen in the past plenty of > > >>> times, that whenever Flink has to drain some > > >>> buffered records, eventually that always brakes timely checkpointing > > (and > > >>> hence ability for Flink to rescale in > > >>> a timely manner). Even a single record with a `flatMap` like operator > > >>> currently in Flink causes problems during > > >>> back pressure. That's especially true for example for processing > > >>> watermarks. At the same time, I don't see how > > >>> this value could be configured by even Flink's power users, let alone > > an > > >>> average user. The size of that in-flight > > >>> buffer not only depends on a particular query/job, but also the > "good" > > >>> value changes dynamically over time, > > >>> and can change very rapidly. Sudden spikes of records or > backpressure, > > >>> some > > >>> hiccup during emitting watermarks, > > >>> all of those could change in an instant the theoretically optimal > > buffer > > >>> size of let's say "6000" records, down to "1". > > >>> And when those changes happen, those are the exact times when timely > > >>> checkpointing matters the most. > > >>> If the load pattern suddenly changes, and checkpointing takes > suddenly > > >>> tens > > >>> of minutes instead of a couple of > > >>> seconds, it means you can not use rescaling and you are forced to > > >>> overprovision the resources. And there also > > >>> other issues if checkpointing takes too long. > > >>> > > >> > > >> I'm gonna clarify some misunderstanding here. First of all, is the > sync > > >> phase of checkpointing for the current plan longer than the > synchronous > > >> execution model? The answer is yes, it is a trade-off for parallel > > >> execution model. I think the cost is worth the improvement. Now the > > >> question is, how much longer are we talking about? The PoC result I > > >> provided is that it takes 3 seconds to drain 6000 records of a simple > > job, > > >> and I said it is not a big deal. Even though you would say we may > > encounter > > >> long watermark/timer processing that make the cp wait, thus I provide > > >> several ways to optimize this: > > >> > > >> 1. Instead of only controlling the in-flight records, we could > > >> control the in-flight watermark. > > >> 2. Since we have broken down the record processing into several > state > > >> requests with at most one subsequent callback for each request, the > > cp can > > >> be processed after current RUNNING requests (NOT records) and their > > >> callbacks finish. Which means, even though we have a lot of records > > >> in-flight (I mean in 'processElement' here), once only a small > group > > of > > >> state requests finishes, the cp can proceed. They will form into 1 > > or 2 > > >> multiGets to rocksdb, which takes less time. Moreover, the timer > > processing > > >> is also split into several requests, so cp won't wait for the whole > > timer > > >> to finish. The picture attached describes this idea. > > >> > > >> And the size of this buffer can be configured. I'm not counting on > > >> average users to configure it well, I'm just saying that we'd better > not > > >> focus on absolute numbers of PoC or specific cases since we can always > > >> provide a conservative default value to make this acceptable for most > > >> cases.The adaptive buffer size is also worth a try if we provide a > > >> conservative strategy. > > >> > > >> Besides, I don't understand why the load pattern or spikes or > > >> backpressure will affect this. We are controlling the records that can > > get > > >> in the 'processElement' and the state requests that can fire in > > parallel, > > >> no matter how high the load spikes, they will be blocked outside. It > is > > >> relatively stable within the proposed execution model itself. The > > unaligned > > >> barrier will skip those inputs in the queue as before. > > >> > > >> > > >> At the same time, I still don't understand why we can not implement > > things > > >>> incrementally? First > > >>> let's start with the current API, without the need to rewrite all of > > the > > >>> operators, we can asynchronously fetch whole > > >>> state for a given record using its key. That should already vastly > > >>> improve > > >>> many things, and this way we could > > >>> perform a checkpoint without a need of draining the > > in-progress/in-flight > > >>> buffer. We could roll that version out, > > >>> test it out in practice, and then we could see if the fine grained > > state > > >>> access is really needed. Otherwise it sounds > > >>> to me like a premature optimization, that requires us to not only > > >>> rewrite a > > >>> lot of the code, but also to later maintain > > >>> it, even if it ultimately proves to be not needed. Which I of course > > can > > >>> not be certain but I have a feeling that it > > >>> might be the case. > > >> > > >> > > >> The disaggregated state management we proposed is target at including > > but > > >> not limited to the following challenges: > > >> > > >> 1. Local disk constraints, including limited I/O and space. > > >> (discussed in FLIP-423) > > >> 2. Unbind the I/O resource with pre-allocated CPU resource, to make > > >> good use of both (motivation of FLIP-424) > > >> 3. Elasticity of scaling I/O or storage capacity. > > >> > > >> Thus our plan is dependent on DFS, using local disk as an optional > cache > > >> only. The pre-fetching plan you mentioned is still binding I/O with > CPU > > >> resources, and will consume even more I/O to load unnecessary state. > It > > >> makes things worse. Please note that we are not targeting some > scenarios > > >> where the local state could handle well, and our goal is not to > replace > > the > > >> local state. > > >> > > >> And If manpower is a big concern of yours, I would say many of my > > >> colleagues could help contribute in runtime or SQL operators. It is > > >> experimental on a separate code path other than the local state and > > will be > > >> recommended to use only when we prove it mature. > > >> > > >> > > >> Thanks & Best, > > >> Zakelly > > >> > > >> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski <pnowoj...@apache.org > > > > >> wrote: > > >> > > >>> Hey Zakelly! > > >>> > > >>> Sorry for the late reply. I still have concerns about the proposed > > >>> solution, with my main concerns coming from > > >>> the implications of the asynchronous state access API on the > > >>> checkpointing > > >>> and responsiveness of Flink. > > >>> > > >>> >> What also worries me a lot in this fine grained model is the > effect > > on > > >>> the checkpointing times. > > >>> > > > >>> > Your concerns are very reasonable. Faster checkpointing is always a > > >>> core > > >>> advantage of disaggregated state, > > >>> > but only for the async phase. There will be some complexity > > introduced > > >>> by > > >>> in-flight requests, but I'd suggest > > >>> > a checkpoint containing those in-flight state requests as part of > the > > >>> state, to accelerate the sync phase by > > >>> > skipping the buffer draining. This makes the buffer size have > little > > >>> impact on checkpoint time. And all the > > >>> > changes keep within the execution model we proposed while the > > >>> checkpoint > > >>> barrier alignment or handling > > >>> > will not be touched in our proposal, so I guess the complexity is > > >>> relatively controllable. I have faith in that :) > > >>> > > >>> As we discussed off-line, you agreed that we can not checkpoint while > > >>> some > > >>> records are in the middle of being > > >>> processed. That we would have to drain the in-progress records before > > >>> doing > > >>> the checkpoint. You also argued > > >>> that this is not a problem, because the size of this buffer can be > > >>> configured. > > >>> > > >>> I'm really afraid of such a solution. I've seen in the past plenty of > > >>> times, that whenever Flink has to drain some > > >>> buffered records, eventually that always brakes timely checkpointing > > (and > > >>> hence ability for Flink to rescale in > > >>> a timely manner). Even a single record with a `flatMap` like operator > > >>> currently in Flink causes problems during > > >>> back pressure. That's especially true for example for processing > > >>> watermarks. At the same time, I don't see how > > >>> this value could be configured by even Flink's power users, let alone > > an > > >>> average user. The size of that in-flight > > >>> buffer not only depends on a particular query/job, but also the > "good" > > >>> value changes dynamically over time, > > >>> and can change very rapidly. Sudden spikes of records or > backpressure, > > >>> some > > >>> hiccup during emitting watermarks, > > >>> all of those could change in an instant the theoretically optimal > > buffer > > >>> size of let's say "6000" records, down to "1". > > >>> And when those changes happen, those are the exact times when timely > > >>> checkpointing matters the most. > > >>> If the load pattern suddenly changes, and checkpointing takes > suddenly > > >>> tens > > >>> of minutes instead of a couple of > > >>> seconds, it means you can not use rescaling and you are forced to > > >>> overprovision the resources. And there also > > >>> other issues if checkpointing takes too long. > > >>> > > >>> At the same time, I still don't understand why we can not implement > > >>> things > > >>> incrementally? First > > >>> let's start with the current API, without the need to rewrite all of > > the > > >>> operators, we can asynchronously fetch whole > > >>> state for a given record using its key. That should already vastly > > >>> improve > > >>> many things, and this way we could > > >>> perform a checkpoint without a need of draining the > > in-progress/in-flight > > >>> buffer. We could roll that version out, > > >>> test it out in practice, and then we could see if the fine grained > > state > > >>> access is really needed. Otherwise it sounds > > >>> to me like a premature optimization, that requires us to not only > > >>> rewrite a > > >>> lot of the code, but also to later maintain > > >>> it, even if it ultimately proves to be not needed. Which I of course > > can > > >>> not be certain but I have a feeling that it > > >>> might be the case. > > >>> > > >>> Best, > > >>> Piotrek > > >>> > > >>> wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com> > > napisał(a): > > >>> > > >>> > Hi everyone, > > >>> > > > >>> > Thanks for your valuable feedback! > > >>> > > > >>> > Our discussions have been going on for a while and are nearing a > > >>> > consensus. So I would like to start a vote after 72 hours. > > >>> > > > >>> > Please let me know if you have any concerns, thanks! > > >>> > > > >>> > > > >>> > Best, > > >>> > Zakelly > > >>> > > > >>> > On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com > > > > >>> wrote: > > >>> > > > >>> > > Hi Yunfeng, > > >>> > > > > >>> > > Thanks for the suggestion! > > >>> > > > > >>> > > I will reorganize the FLIP-425 accordingly. > > >>> > > > > >>> > > > > >>> > > Best, > > >>> > > Zakelly > > >>> > > > > >>> > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou < > > >>> > flink.zhouyunf...@gmail.com> > > >>> > > wrote: > > >>> > > > > >>> > >> Hi Xintong and Zakelly, > > >>> > >> > > >>> > >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks > > >>> > >> I agree with it that watermarks can use only out-of-order mode > for > > >>> > >> now, because there is still not a concrete example showing the > > >>> > >> correctness risk about it. However, the strictly-ordered mode > > should > > >>> > >> still be supported as the default option for non-record event > > types > > >>> > >> other than watermark, at least for checkpoint barriers. > > >>> > >> > > >>> > >> I noticed that this information has already been documented in > > "For > > >>> > >> other non-record events, such as RecordAttributes ...", but it's > > at > > >>> > >> the bottom of the "Watermark" section, which might not be very > > >>> > >> obvious. Thus it might be better to reorganize the FLIP to > better > > >>> > >> claim that the two order modes are designed for all non-record > > >>> events, > > >>> > >> and which mode this FLIP would choose for each type of event. > > >>> > >> > > >>> > >> Best, > > >>> > >> Yunfeng > > >>> > >> > > >>> > >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song < > > tonysong...@gmail.com > > >>> > > > >>> > >> wrote: > > >>> > >> > > > >>> > >> > Thanks for the quick response. Sounds good to me. > > >>> > >> > > > >>> > >> > Best, > > >>> > >> > > > >>> > >> > Xintong > > >>> > >> > > > >>> > >> > > > >>> > >> > > > >>> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan < > > >>> zakelly....@gmail.com> > > >>> > >> wrote: > > >>> > >> > > > >>> > >> > > Hi Xintong, > > >>> > >> > > > > >>> > >> > > Thanks for sharing your thoughts! > > >>> > >> > > > > >>> > >> > > 1. Regarding Record-ordered and State-ordered of > > processElement. > > >>> > >> > > > > > >>> > >> > > > I understand that while State-ordered likely provides > better > > >>> > >> performance, > > >>> > >> > > > Record-ordered is sometimes required for correctness. The > > >>> question > > >>> > >> is how > > >>> > >> > > > should a user choose between these two modes? My concern > is > > >>> that > > >>> > >> such a > > >>> > >> > > > decision may require users to have in-depth knowledge > about > > >>> the > > >>> > >> Flink > > >>> > >> > > > internals, and may lead to correctness issues if > > >>> State-ordered is > > >>> > >> chosen > > >>> > >> > > > improperly. > > >>> > >> > > > > > >>> > >> > > > I'd suggest not to expose such a knob, at least in the > first > > >>> > >> version. > > >>> > >> > > That > > >>> > >> > > > means always use Record-ordered for custom operators / > UDFs, > > >>> and > > >>> > >> keep > > >>> > >> > > > State-ordered for internal usages (built-in operators) > only. > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > Indeed, users may not be able to choose the mode properly. I > > >>> agree > > >>> > to > > >>> > >> keep > > >>> > >> > > such options for internal use. > > >>> > >> > > > > >>> > >> > > > > >>> > >> > > 2. Regarding Strictly-ordered and Out-of-order of > Watermarks. > > >>> > >> > > > > > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the > > >>> default, or > > >>> > >> even > > >>> > >> > > > being supported. From my understanding, a Watermark(T) > only > > >>> > >> suggests that > > >>> > >> > > > all records with event time before T has arrived, and it > has > > >>> > >> nothing to > > >>> > >> > > do > > >>> > >> > > > with whether records with event time after T has arrived > or > > >>> not. > > >>> > >> From > > >>> > >> > > that > > >>> > >> > > > perspective, preventing certain records from arriving > > before a > > >>> > >> Watermark > > >>> > >> > > is > > >>> > >> > > > never supported. I also cannot come up with any use case > > where > > >>> > >> > > > Strictly-ordered is necessary. This implies the same issue > > as > > >>> 1): > > >>> > >> how > > >>> > >> > > does > > >>> > >> > > > the user choose between the two modes? > > >>> > >> > > > > > >>> > >> > > > I'd suggest not expose the knob to users and only support > > >>> > >> Out-of-order, > > >>> > >> > > > until we see a concrete use case that Strictly-ordered is > > >>> needed. > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > The semantics of watermarks do not define the sequence > > between a > > >>> > >> watermark > > >>> > >> > > and subsequent records. For the most part, this is > > >>> inconsequential, > > >>> > >> except > > >>> > >> > > it may affect some current users who have previously relied > on > > >>> the > > >>> > >> implicit > > >>> > >> > > assumption of an ordered execution. I'd be fine with > initially > > >>> > >> supporting > > >>> > >> > > only out-of-order processing. We may consider exposing the > > >>> > >> > > 'Strictly-ordered' mode once we encounter a concrete use > case > > >>> that > > >>> > >> > > necessitates it. > > >>> > >> > > > > >>> > >> > > > > >>> > >> > > My philosophies behind not exposing the two config options > > are: > > >>> > >> > > > - There are already too many options in Flink that barely > > >>> know how > > >>> > >> to use > > >>> > >> > > > them. I think Flink should try as much as possible to > decide > > >>> its > > >>> > own > > >>> > >> > > > behavior, rather than throwing all the decisions to the > > users. > > >>> > >> > > > - It's much harder to take back knobs than to introduce > > them. > > >>> > >> Therefore, > > >>> > >> > > > options should be introduced only if concrete use cases > are > > >>> > >> identified. > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > I agree to keep minimal configurable items especially for > the > > >>> MVP. > > >>> > >> Given > > >>> > >> > > that we have the opportunity to refine the functionality > > before > > >>> the > > >>> > >> > > framework transitions from @Experimental to @PublicEvolving, > > it > > >>> > makes > > >>> > >> sense > > >>> > >> > > to refrain from presenting user-facing options until we have > > >>> ensured > > >>> > >> > > their necessity. > > >>> > >> > > > > >>> > >> > > > > >>> > >> > > Best, > > >>> > >> > > Zakelly > > >>> > >> > > > > >>> > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song < > > >>> > tonysong...@gmail.com> > > >>> > >> > > wrote: > > >>> > >> > > > > >>> > >> > > > Sorry for joining the discussion late. > > >>> > >> > > > > > >>> > >> > > > I have two questions about FLIP-425. > > >>> > >> > > > > > >>> > >> > > > 1. Regarding Record-ordered and State-ordered of > > >>> processElement. > > >>> > >> > > > > > >>> > >> > > > I understand that while State-ordered likely provides > better > > >>> > >> performance, > > >>> > >> > > > Record-ordered is sometimes required for correctness. The > > >>> question > > >>> > >> is how > > >>> > >> > > > should a user choose between these two modes? My concern > is > > >>> that > > >>> > >> such a > > >>> > >> > > > decision may require users to have in-depth knowledge > about > > >>> the > > >>> > >> Flink > > >>> > >> > > > internals, and may lead to correctness issues if > > >>> State-ordered is > > >>> > >> chosen > > >>> > >> > > > improperly. > > >>> > >> > > > > > >>> > >> > > > I'd suggest not to expose such a knob, at least in the > first > > >>> > >> version. > > >>> > >> > > That > > >>> > >> > > > means always use Record-ordered for custom operators / > UDFs, > > >>> and > > >>> > >> keep > > >>> > >> > > > State-ordered for internal usages (built-in operators) > only. > > >>> > >> > > > > > >>> > >> > > > 2. Regarding Strictly-ordered and Out-of-order of > > Watermarks. > > >>> > >> > > > > > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the > > >>> default, or > > >>> > >> even > > >>> > >> > > > being supported. From my understanding, a Watermark(T) > only > > >>> > >> suggests that > > >>> > >> > > > all records with event time before T has arrived, and it > has > > >>> > >> nothing to > > >>> > >> > > do > > >>> > >> > > > with whether records with event time after T has arrived > or > > >>> not. > > >>> > >> From > > >>> > >> > > that > > >>> > >> > > > perspective, preventing certain records from arriving > > before a > > >>> > >> Watermark > > >>> > >> > > is > > >>> > >> > > > never supported. I also cannot come up with any use case > > where > > >>> > >> > > > Strictly-ordered is necessary. This implies the same issue > > as > > >>> 1): > > >>> > >> how > > >>> > >> > > does > > >>> > >> > > > the user choose between the two modes? > > >>> > >> > > > > > >>> > >> > > > I'd suggest not expose the knob to users and only support > > >>> > >> Out-of-order, > > >>> > >> > > > until we see a concrete use case that Strictly-ordered is > > >>> needed. > > >>> > >> > > > > > >>> > >> > > > > > >>> > >> > > > My philosophies behind not exposing the two config options > > >>> are: > > >>> > >> > > > - There are already too many options in Flink that barely > > >>> know how > > >>> > >> to use > > >>> > >> > > > them. I think Flink should try as much as possible to > decide > > >>> its > > >>> > own > > >>> > >> > > > behavior, rather than throwing all the decisions to the > > users. > > >>> > >> > > > - It's much harder to take back knobs than to introduce > > them. > > >>> > >> Therefore, > > >>> > >> > > > options should be introduced only if concrete use cases > are > > >>> > >> identified. > > >>> > >> > > > > > >>> > >> > > > WDYT? > > >>> > >> > > > > > >>> > >> > > > Best, > > >>> > >> > > > > > >>> > >> > > > Xintong > > >>> > >> > > > > > >>> > >> > > > > > >>> > >> > > > > > >>> > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge > > >>> <j...@ververica.com.invalid > > >>> > > > > >>> > >> > > wrote: > > >>> > >> > > > > > >>> > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 > which > > >>> > >> introduced > > >>> > >> > > the > > >>> > >> > > > > intention of the big change and high level architecture. > > >>> Great > > >>> > >> content > > >>> > >> > > > btw! > > >>> > >> > > > > The only public interface change for this FLIP is one > new > > >>> config > > >>> > >> to use > > >>> > >> > > > > ForSt. It makes sense to have one dedicated discussion > > >>> thread > > >>> > for > > >>> > >> each > > >>> > >> > > > > concrete system design. > > >>> > >> > > > > > > >>> > >> > > > > @Zakelly The links in your mail do not work except the > > last > > >>> one, > > >>> > >> > > because > > >>> > >> > > > > the FLIP-xxx has been included into the url like > > >>> > >> > > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 > > >>> > >> > > > . > > >>> > >> > > > > > > >>> > >> > > > > NIT fix: > > >>> > >> > > > > > > >>> > >> > > > > FLIP-424: > > >>> > >> > > > > > >>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 > > >>> > >> > > > > > > >>> > >> > > > > FLIP-425: > > >>> > >> > > > > > >>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h > > >>> > >> > > > > > > >>> > >> > > > > FLIP-426: > > >>> > >> > > > > > >>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf > > >>> > >> > > > > > > >>> > >> > > > > FLIP-427: > > >>> > >> > > > > > >>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft > > >>> > >> > > > > > > >>> > >> > > > > FLIP-428: > > >>> > >> > > > > > >>> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b > > >>> > >> > > > > > > >>> > >> > > > > Best regards, > > >>> > >> > > > > Jing > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan < > > >>> > >> zakelly....@gmail.com> > > >>> > >> > > > wrote: > > >>> > >> > > > > > > >>> > >> > > > > > Hi everyone, > > >>> > >> > > > > > > > >>> > >> > > > > > Thank you all for a lively discussion here, and it is > a > > >>> good > > >>> > >> time to > > >>> > >> > > > move > > >>> > >> > > > > > forward to more detailed discussions. Thus we open > > several > > >>> > >> threads > > >>> > >> > > for > > >>> > >> > > > > > sub-FLIPs: > > >>> > >> > > > > > > > >>> > >> > > > > > FLIP-424: > > >>> > >> > > > > > > >>> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 > > >>> > >> > > > > > FLIP-425 > > >>> > >> > > > > > < > > >>> > >> > > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 > > >>> > >> > > > >: > > >>> > >> > > > > > > > >>> > >> > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h > > >>> > >> > > > > > FLIP-426 > > >>> > >> > > > > > < > > >>> > >> > > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426 > > >>> > >> > > > >: > > >>> > >> > > > > > > > >>> > >> > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf > > >>> > >> > > > > > FLIP-427 > > >>> > >> > > > > > < > > >>> > >> > > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427 > > >>> > >> > > > >: > > >>> > >> > > > > > > > >>> > >> > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft > > >>> > >> > > > > > FLIP-428 > > >>> > >> > > > > > < > > >>> > >> > > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428 > > >>> > >> > > > >: > > >>> > >> > > > > > > > >>> > >> > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b > > >>> > >> > > > > > > > >>> > >> > > > > > If you want to talk about the overall architecture, > > >>> roadmap, > > >>> > >> > > milestones > > >>> > >> > > > > or > > >>> > >> > > > > > something related with multiple FLIPs, please post it > > >>> here. > > >>> > >> Otherwise > > >>> > >> > > > you > > >>> > >> > > > > > can discuss some details in separate mails. Let's try > to > > >>> avoid > > >>> > >> > > repeated > > >>> > >> > > > > > discussion in different threads. I will sync important > > >>> > messages > > >>> > >> here > > >>> > >> > > if > > >>> > >> > > > > > there are any in the above threads. > > >>> > >> > > > > > > > >>> > >> > > > > > And reply to @Jeyhun: We will ensure the content > between > > >>> those > > >>> > >> FLIPs > > >>> > >> > > is > > >>> > >> > > > > > consistent. > > >>> > >> > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > Best, > > >>> > >> > > > > > Zakelly > > >>> > >> > > > > > > > >>> > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei < > > >>> > yuanmei.w...@gmail.com > > >>> > >> > > > >>> > >> > > > wrote: > > >>> > >> > > > > > > > >>> > >> > > > > > > I have been a bit busy these few weeks and sorry for > > >>> > >> responding > > >>> > >> > > late. > > >>> > >> > > > > > > > > >>> > >> > > > > > > The original thinking of keeping discussion within > one > > >>> > thread > > >>> > >> is > > >>> > >> > > for > > >>> > >> > > > > > easier > > >>> > >> > > > > > > tracking and avoid for repeated discussion in > > different > > >>> > >> threads. > > >>> > >> > > > > > > > > >>> > >> > > > > > > For details, It might be good to start in different > > >>> threads > > >>> > if > > >>> > >> > > > needed. > > >>> > >> > > > > > > > > >>> > >> > > > > > > We will think of a way to better organize the > > >>> discussion. > > >>> > >> > > > > > > > > >>> > >> > > > > > > Best > > >>> > >> > > > > > > Yuan > > >>> > >> > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov < > > >>> > >> > > je.kari...@gmail.com> > > >>> > >> > > > > > > wrote: > > >>> > >> > > > > > > > > >>> > >> > > > > > > > Hi, > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > + 1 for the suggestion. > > >>> > >> > > > > > > > Maybe we can the discussion with the FLIPs with > > >>> minimum > > >>> > >> > > > dependencies > > >>> > >> > > > > > > (from > > >>> > >> > > > > > > > the other new/proposed FLIPs). > > >>> > >> > > > > > > > Based on our discussion on a particular FLIP, the > > >>> > >> subsequent (or > > >>> > >> > > > its > > >>> > >> > > > > > > > dependent) FLIP(s) can be updated accordingly? > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > Regards, > > >>> > >> > > > > > > > Jeyhun > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra < > > >>> > >> gyula.f...@gmail.com> > > >>> > >> > > > > > wrote: > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > Hey all! > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > This is a massive improvement / work. I just > > started > > >>> > going > > >>> > >> > > > through > > >>> > >> > > > > > the > > >>> > >> > > > > > > > > Flips and have a more or less meta comment. > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > While it's good to keep the overall architecture > > >>> > >> discussion > > >>> > >> > > > here, I > > >>> > >> > > > > > > think > > >>> > >> > > > > > > > > we should still have separate discussions for > each > > >>> FLIP > > >>> > >> where > > >>> > >> > > we > > >>> > >> > > > > can > > >>> > >> > > > > > > > > discuss interface details etc. With so much > > content > > >>> if > > >>> > we > > >>> > >> start > > >>> > >> > > > > > adding > > >>> > >> > > > > > > > > minor comments here that will lead to nowhere > but > > >>> those > > >>> > >> > > > discussions > > >>> > >> > > > > > are > > >>> > >> > > > > > > > > still important and we should have them in > > separate > > >>> > >> threads > > >>> > >> > > (one > > >>> > >> > > > > for > > >>> > >> > > > > > > each > > >>> > >> > > > > > > > > FLIP) > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > What do you think? > > >>> > >> > > > > > > > > Gyula > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei < > > >>> > >> fredia...@gmail.com > > >>> > >> > > > > > >>> > >> > > > > > wrote: > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > Hi team, > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > Thanks for your discussion. Regarding > FLIP-425, > > we > > >>> > have > > >>> > >> > > > > > supplemented > > >>> > >> > > > > > > > > > several updates to answer high-frequency > > >>> questions: > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap > > state > > >>> > >> backend in > > >>> > >> > > > > > > > > > "Synchronous execution with asynchronous > > APIs"[1], > > >>> > which > > >>> > >> > > > reveals > > >>> > >> > > > > > that > > >>> > >> > > > > > > > > > the framework overhead (including reference > > >>> counting, > > >>> > >> > > > > > future-related > > >>> > >> > > > > > > > > > code and so on) consumes about 9% of the keyed > > >>> > operator > > >>> > >> CPU > > >>> > >> > > > time. > > >>> > >> > > > > > > > > > 2. We added a set of comparative experiments > for > > >>> > >> watermark > > >>> > >> > > > > > > processing, > > >>> > >> > > > > > > > > > the performance of Out-Of-Order mode is 70% > > better > > >>> > than > > >>> > >> > > > > > > > > > strictly-ordered mode under ~140MB state size. > > >>> > >> Instructions > > >>> > >> > > on > > >>> > >> > > > > how > > >>> > >> > > > > > to > > >>> > >> > > > > > > > > > run this test have also been added[2]. > > >>> > >> > > > > > > > > > 3. Regarding the order of StreamRecord, > whether > > >>> it has > > >>> > >> state > > >>> > >> > > > > access > > >>> > >> > > > > > > or > > >>> > >> > > > > > > > > > not. We supplemented a new *Strict order of > > >>> > >> > > > 'processElement'*[3]. > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > [1] > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs > > >>> > >> > > > > > > > > > [2] > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing > > >>> > >> > > > > > > > > > [3] > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > Best regards, > > >>> > >> > > > > > > > > > Yanfei > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> > > >>> > 于2024年3月5日周二 > > >>> > >> > > > 09:25写道: > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > Hi Zakelly, > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this > > later > > >>> > >> since it > > >>> > >> > > is > > >>> > >> > > > > not > > >>> > >> > > > > > > > > > important. > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > It seems that we still have some details to > > >>> confirm > > >>> > >> about > > >>> > >> > > > this > > >>> > >> > > > > > > > > > > question. Let's postpone this to after the > > >>> critical > > >>> > >> parts > > >>> > >> > > of > > >>> > >> > > > > the > > >>> > >> > > > > > > > > > > design are settled. > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics > should > > >>> be > > >>> > like > > >>> > >> > > > > > afterwards. > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with > > >>> discussing > > >>> > >> this > > >>> > >> > > > topic > > >>> > >> > > > > > in > > >>> > >> > > > > > > > > > milestone 2. > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > Looking forward to the detailed design about > > the > > >>> > >> strict > > >>> > >> > > mode > > >>> > >> > > > > > > between > > >>> > >> > > > > > > > > > > same-key records and the benchmark results > > >>> about the > > >>> > >> epoch > > >>> > >> > > > > > > mechanism. > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > Best regards, > > >>> > >> > > > > > > > > > > Yunfeng > > >>> > >> > > > > > > > > > > > > >>> > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan < > > >>> > >> > > > > > zakelly....@gmail.com> > > >>> > >> > > > > > > > > > wrote: > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > Hi Yunfeng, > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 1: > > >>> > >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I > > >>> realize > > >>> > >> it is > > >>> > >> > > a > > >>> > >> > > > > > common > > >>> > >> > > > > > > > > case > > >>> > >> > > > > > > > > > the same-key record should be blocked before > the > > >>> > >> > > > > `processElement`. > > >>> > >> > > > > > It > > >>> > >> > > > > > > > is > > >>> > >> > > > > > > > > > easier for users to understand. Thus I will > > >>> introduce > > >>> > a > > >>> > >> > > strict > > >>> > >> > > > > mode > > >>> > >> > > > > > > for > > >>> > >> > > > > > > > > > this and make it default. My rough idea is > just > > >>> like > > >>> > >> yours, > > >>> > >> > > by > > >>> > >> > > > > > > invoking > > >>> > >> > > > > > > > > > some method of AEC instance before > > >>> `processElement`. > > >>> > The > > >>> > >> > > > detailed > > >>> > >> > > > > > > > design > > >>> > >> > > > > > > > > > will be described in FLIP later. > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 2: > > >>> > >> > > > > > > > > > > > I agree with you. We could throw > exceptions > > >>> for > > >>> > now > > >>> > >> and > > >>> > >> > > > > > optimize > > >>> > >> > > > > > > > this > > >>> > >> > > > > > > > > > later. > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 5: > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> It might be better to move the default > > >>> values to > > >>> > >> the > > >>> > >> > > > > Proposed > > >>> > >> > > > > > > > > Changes > > >>> > >> > > > > > > > > > > >> section instead of making them public for > > >>> now, as > > >>> > >> there > > >>> > >> > > > will > > >>> > >> > > > > > be > > >>> > >> > > > > > > > > > > >> compatibility issues once we want to > > >>> dynamically > > >>> > >> adjust > > >>> > >> > > > the > > >>> > >> > > > > > > > > thresholds > > >>> > >> > > > > > > > > > > >> and timeouts in future. > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > Agreed. The whole framework is under > > >>> experiment > > >>> > >> until we > > >>> > >> > > > > think > > >>> > >> > > > > > it > > >>> > >> > > > > > > > is > > >>> > >> > > > > > > > > > complete in 2.0 or later. The default value > > >>> should be > > >>> > >> better > > >>> > >> > > > > > > determined > > >>> > >> > > > > > > > > > with more testing results and production > > >>> experience. > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > >> The configuration > > >>> execution.async-state.enabled > > >>> > >> seems > > >>> > >> > > > > > > unnecessary, > > >>> > >> > > > > > > > > as > > >>> > >> > > > > > > > > > > >> the infrastructure may automatically get > > this > > >>> > >> > > information > > >>> > >> > > > > from > > >>> > >> > > > > > > the > > >>> > >> > > > > > > > > > > >> detailed state backend configurations. We > > may > > >>> > >> revisit > > >>> > >> > > this > > >>> > >> > > > > > part > > >>> > >> > > > > > > > > after > > >>> > >> > > > > > > > > > > >> the core designs have reached an > agreement. > > >>> WDYT? > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > I'm not very sure if there is any use case > > >>> where > > >>> > >> users > > >>> > >> > > > write > > >>> > >> > > > > > > their > > >>> > >> > > > > > > > > > code using async APIs but run their job in a > > >>> > >> synchronous way. > > >>> > >> > > > The > > >>> > >> > > > > > > first > > >>> > >> > > > > > > > > two > > >>> > >> > > > > > > > > > scenarios that come to me are for benchmarking > > or > > >>> for > > >>> > a > > >>> > >> small > > >>> > >> > > > > > state, > > >>> > >> > > > > > > > > while > > >>> > >> > > > > > > > > > they don't want to rewrite their code. > Actually > > >>> it is > > >>> > >> easy to > > >>> > >> > > > > > > support, > > >>> > >> > > > > > > > so > > >>> > >> > > > > > > > > > I'd suggest providing it. But I'm fine with > > >>> revisiting > > >>> > >> this > > >>> > >> > > > later > > >>> > >> > > > > > > since > > >>> > >> > > > > > > > > it > > >>> > >> > > > > > > > > > is not important. WDYT? > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 8: > > >>> > >> > > > > > > > > > > > Yes, we had considered the I/O metrics > group > > >>> > >> especially > > >>> > >> > > the > > >>> > >> > > > > > > > > > back-pressure, idle and task busy per second. > In > > >>> the > > >>> > >> current > > >>> > >> > > > plan > > >>> > >> > > > > > we > > >>> > >> > > > > > > > can > > >>> > >> > > > > > > > > do > > >>> > >> > > > > > > > > > state access during back-pressure, meaning > that > > >>> those > > >>> > >> metrics > > >>> > >> > > > for > > >>> > >> > > > > > > input > > >>> > >> > > > > > > > > > would better be redefined. I suggest we > discuss > > >>> these > > >>> > >> > > existing > > >>> > >> > > > > > > metrics > > >>> > >> > > > > > > > as > > >>> > >> > > > > > > > > > well as some new metrics that should be > > >>> introduced in > > >>> > >> > > FLIP-431 > > >>> > >> > > > > > later > > >>> > >> > > > > > > in > > >>> > >> > > > > > > > > > milestone 2, since we have basically finished > > the > > >>> > >> framework > > >>> > >> > > > thus > > >>> > >> > > > > we > > >>> > >> > > > > > > > will > > >>> > >> > > > > > > > > > have a better view of what metrics should be > > like > > >>> > >> afterwards. > > >>> > >> > > > > WDYT? > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > Best, > > >>> > >> > > > > > > > > > > > Zakelly > > >>> > >> > > > > > > > > > > > > > >>> > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng > Zhou > > < > > >>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote: > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Hi Zakelly, > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Thanks for the responses! > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> > 1. I will discuss this with some expert > > SQL > > >>> > >> > > developers. > > >>> > >> > > > > ... > > >>> > >> > > > > > > mode > > >>> > >> > > > > > > > > > for StreamRecord processing. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> In DataStream API there should also be > use > > >>> cases > > >>> > >> when > > >>> > >> > > the > > >>> > >> > > > > > order > > >>> > >> > > > > > > of > > >>> > >> > > > > > > > > > > >> output is strictly required. I agree with > > it > > >>> that > > >>> > >> SQL > > >>> > >> > > > > experts > > >>> > >> > > > > > > may > > >>> > >> > > > > > > > > help > > >>> > >> > > > > > > > > > > >> provide more concrete use cases that can > > >>> > >> accelerate our > > >>> > >> > > > > > > > discussion, > > >>> > >> > > > > > > > > > > >> but please allow me to search for > > DataStream > > >>> use > > >>> > >> cases > > >>> > >> > > > that > > >>> > >> > > > > > can > > >>> > >> > > > > > > > > prove > > >>> > >> > > > > > > > > > > >> the necessity of this strict order > > >>> preservation > > >>> > >> mode, if > > >>> > >> > > > > > answers > > >>> > >> > > > > > > > > from > > >>> > >> > > > > > > > > > > >> SQL experts are shown to be negative. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> For your convenience, my current rough > idea > > >>> is > > >>> > >> that we > > >>> > >> > > can > > >>> > >> > > > > > add a > > >>> > >> > > > > > > > > > > >> module between the Input(s) and > > >>> processElement() > > >>> > >> module > > >>> > >> > > in > > >>> > >> > > > > > Fig 2 > > >>> > >> > > > > > > > of > > >>> > >> > > > > > > > > > > >> FLIP-425. The module will be responsible > > for > > >>> > >> caching > > >>> > >> > > > records > > >>> > >> > > > > > > whose > > >>> > >> > > > > > > > > > > >> keys collide with in-flight records, and > > AEC > > >>> will > > >>> > >> only > > >>> > >> > > be > > >>> > >> > > > > > > > > responsible > > >>> > >> > > > > > > > > > > >> for handling async state calls, without > > >>> knowing > > >>> > the > > >>> > >> > > record > > >>> > >> > > > > > each > > >>> > >> > > > > > > > call > > >>> > >> > > > > > > > > > > >> belongs to. We may revisit this topic > once > > >>> the > > >>> > >> necessity > > >>> > >> > > > of > > >>> > >> > > > > > the > > >>> > >> > > > > > > > > strict > > >>> > >> > > > > > > > > > > >> order mode is clarified. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests > > ... > > >>> > >> instead of > > >>> > >> > > > > > > invoking > > >>> > >> > > > > > > > > > yield > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Your suggestions generally appeal to me. > I > > >>> think > > >>> > >> we may > > >>> > >> > > > let > > >>> > >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM > for > > >>> now, > > >>> > >> since > > >>> > >> > > the > > >>> > >> > > > > > > majority > > >>> > >> > > > > > > > > of > > >>> > >> > > > > > > > > > > >> a StateRequest should just be references > to > > >>> > >> existing > > >>> > >> > > Java > > >>> > >> > > > > > > objects, > > >>> > >> > > > > > > > > > > >> which only occupies very small memory > space > > >>> and > > >>> > can > > >>> > >> > > hardly > > >>> > >> > > > > > cause > > >>> > >> > > > > > > > OOM > > >>> > >> > > > > > > > > > > >> in common cases. We can monitor the > pending > > >>> > >> > > StateRequests > > >>> > >> > > > > and > > >>> > >> > > > > > if > > >>> > >> > > > > > > > > there > > >>> > >> > > > > > > > > > > >> is really a risk of OOM in extreme cases, > > we > > >>> can > > >>> > >> throw > > >>> > >> > > > > > > Exceptions > > >>> > >> > > > > > > > > with > > >>> > >> > > > > > > > > > > >> proper messages notifying users what to > do, > > >>> like > > >>> > >> > > > increasing > > >>> > >> > > > > > > memory > > >>> > >> > > > > > > > > > > >> through configurations. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Your suggestions to adjust threshold > > >>> adaptively > > >>> > or > > >>> > >> to > > >>> > >> > > use > > >>> > >> > > > > the > > >>> > >> > > > > > > > > blocking > > >>> > >> > > > > > > > > > > >> buffer sounds good, and in my opinion we > > can > > >>> > >> postpone > > >>> > >> > > them > > >>> > >> > > > > to > > >>> > >> > > > > > > > future > > >>> > >> > > > > > > > > > > >> FLIPs since they seem to only benefit > users > > >>> in > > >>> > rare > > >>> > >> > > cases. > > >>> > >> > > > > > Given > > >>> > >> > > > > > > > > that > > >>> > >> > > > > > > > > > > >> FLIP-423~428 has already been a big > enough > > >>> > design, > > >>> > >> it > > >>> > >> > > > might > > >>> > >> > > > > be > > >>> > >> > > > > > > > > better > > >>> > >> > > > > > > > > > > >> to focus on the most critical design for > > now > > >>> and > > >>> > >> > > postpone > > >>> > >> > > > > > > > > > > >> optimizations like this. WDYT? > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs > as > > >>> well > > >>> > as > > >>> > >> > > their > > >>> > >> > > > > > > default > > >>> > >> > > > > > > > > > value. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Thanks for adding the default values and > > the > > >>> > values > > >>> > >> > > > > themselves > > >>> > >> > > > > > > > LGTM. > > >>> > >> > > > > > > > > > > >> It might be better to move the default > > >>> values to > > >>> > >> the > > >>> > >> > > > > Proposed > > >>> > >> > > > > > > > > Changes > > >>> > >> > > > > > > > > > > >> section instead of making them public for > > >>> now, as > > >>> > >> there > > >>> > >> > > > will > > >>> > >> > > > > > be > > >>> > >> > > > > > > > > > > >> compatibility issues once we want to > > >>> dynamically > > >>> > >> adjust > > >>> > >> > > > the > > >>> > >> > > > > > > > > thresholds > > >>> > >> > > > > > > > > > > >> and timeouts in future. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> The configuration > > >>> execution.async-state.enabled > > >>> > >> seems > > >>> > >> > > > > > > unnecessary, > > >>> > >> > > > > > > > > as > > >>> > >> > > > > > > > > > > >> the infrastructure may automatically get > > this > > >>> > >> > > information > > >>> > >> > > > > from > > >>> > >> > > > > > > the > > >>> > >> > > > > > > > > > > >> detailed state backend configurations. We > > may > > >>> > >> revisit > > >>> > >> > > this > > >>> > >> > > > > > part > > >>> > >> > > > > > > > > after > > >>> > >> > > > > > > > > > > >> the core designs have reached an > agreement. > > >>> WDYT? > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, > it > > >>> comes > > >>> > >> to me > > >>> > >> > > > that > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics > that > > >>> > measure > > >>> > >> the > > >>> > >> > > > time > > >>> > >> > > > > a > > >>> > >> > > > > > > > Flink > > >>> > >> > > > > > > > > > > >> job is back-pressured by State IOs? Under > > the > > >>> > >> current > > >>> > >> > > > > design, > > >>> > >> > > > > > > this > > >>> > >> > > > > > > > > > > >> metric could measure the time when the > > >>> blocking > > >>> > >> buffer > > >>> > >> > > is > > >>> > >> > > > > full > > >>> > >> > > > > > > and > > >>> > >> > > > > > > > > > > >> yield() cannot get callbacks to process, > > >>> which > > >>> > >> means the > > >>> > >> > > > > > > operator > > >>> > >> > > > > > > > is > > >>> > >> > > > > > > > > > > >> fully waiting for state responses. > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> Best regards, > > >>> > >> > > > > > > > > > > >> Yunfeng > > >>> > >> > > > > > > > > > > >> > > >>> > >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly > > Lan < > > >>> > >> > > > > > > > zakelly....@gmail.com> > > >>> > >> > > > > > > > > > wrote: > > >>> > >> > > > > > > > > > > >> > > > >>> > >> > > > > > > > > > > >> > Hi Yunfeng, > > >>> > >> > > > > > > > > > > >> > > > >>> > >> > > > > > > > > > > >> > Thanks for your detailed comments! > > >>> > >> > > > > > > > > > > >> > > > >>> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on > > >>> > >> StateIterator? > > >>> > >> > > > This > > >>> > >> > > > > > > > method > > >>> > >> > > > > > > > > > seems > > >>> > >> > > > > > > > > > > >> >> unused in the usage example codes. > > >>> > >> > > > > > > > > > > >> > > > >>> > >> > > > > > > > > > > >> > > > >>> > >> > > > > > > > > > > >> > The `close()` is introduced to release > > >>> internal > > >>> > >> > > > resources, > > >>> > >> > > > > > but > > >>> > >> > > > > > > > it > > >>> > >> > > > > > > > > > does not seem to require the user to call it. > I > > >>> > removed > > >>> > >> this. > > >>> > >> > > > > > > > > > > >> > > > >>> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s > > JavaDoc, > > >>> it > > >>> > is > > >>> > >> > > stated > > >>