Hi devs, It seems there is no more concern or suggestion for a while. We'd like to start voting recently.
Best, Zakelly On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi everyone, > > Piotr and I had a long discussion about the checkpoint duration issue. We > think that the lambda serialization approach I proposed in last mail may > bring in more issues, the most serious one is that users may not be able to > modify their code in serialized lambda to perform a bug fix. > > But fortunately we found a possible solution. By introducing a set of > declarative APIs and a `declareProcess()` function that users should > implement in some newly introduced AbstractStreamOperator, we could get the > declaration of record processing in runtime, broken down to requests and > callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of > lambda (de)serialization and instead we retrieve callbacks every time > before a task runs. The next step is to provide an API allowing users to > assign an unique id to each state request and callback, or automatically > assign one by declaration order. Thus we can find the corresponding > callback in runtime for each restored state request based on the id, then > the whole pipeline can be resumed. > > Note that all these above are internal and won't expose to average users. > Exposing this on Stream APIs can be discussed later. I will prepare another > FLIP(s) describing the whole optimized checkpointing process, and in the > meantime, we can proceed on current FLIPs. The new FLIP(s) are built on top > of current ones and we can achieve this incrementally. > > > Best, > Zakelly > > On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan <zakelly....@gmail.com> > wrote: > >> Hi Piotrek, >> >> Thanks for your comments! >> >> As we discussed off-line, you agreed that we can not checkpoint while some >>> records are in the middle of being >>> processed. That we would have to drain the in-progress records before >>> doing >>> the checkpoint. You also argued >>> that this is not a problem, because the size of this buffer can be >>> configured. >>> >>> I'm really afraid of such a solution. I've seen in the past plenty of >>> times, that whenever Flink has to drain some >>> buffered records, eventually that always brakes timely checkpointing (and >>> hence ability for Flink to rescale in >>> a timely manner). Even a single record with a `flatMap` like operator >>> currently in Flink causes problems during >>> back pressure. That's especially true for example for processing >>> watermarks. At the same time, I don't see how >>> this value could be configured by even Flink's power users, let alone an >>> average user. The size of that in-flight >>> buffer not only depends on a particular query/job, but also the "good" >>> value changes dynamically over time, >>> and can change very rapidly. Sudden spikes of records or backpressure, >>> some >>> hiccup during emitting watermarks, >>> all of those could change in an instant the theoretically optimal buffer >>> size of let's say "6000" records, down to "1". >>> And when those changes happen, those are the exact times when timely >>> checkpointing matters the most. >>> If the load pattern suddenly changes, and checkpointing takes suddenly >>> tens >>> of minutes instead of a couple of >>> seconds, it means you can not use rescaling and you are forced to >>> overprovision the resources. And there also >>> other issues if checkpointing takes too long. >>> >> >> I'm gonna clarify some misunderstanding here. First of all, is the sync >> phase of checkpointing for the current plan longer than the synchronous >> execution model? The answer is yes, it is a trade-off for parallel >> execution model. I think the cost is worth the improvement. Now the >> question is, how much longer are we talking about? The PoC result I >> provided is that it takes 3 seconds to drain 6000 records of a simple job, >> and I said it is not a big deal. Even though you would say we may encounter >> long watermark/timer processing that make the cp wait, thus I provide >> several ways to optimize this: >> >> 1. Instead of only controlling the in-flight records, we could >> control the in-flight watermark. >> 2. Since we have broken down the record processing into several state >> requests with at most one subsequent callback for each request, the cp can >> be processed after current RUNNING requests (NOT records) and their >> callbacks finish. Which means, even though we have a lot of records >> in-flight (I mean in 'processElement' here), once only a small group of >> state requests finishes, the cp can proceed. They will form into 1 or 2 >> multiGets to rocksdb, which takes less time. Moreover, the timer >> processing >> is also split into several requests, so cp won't wait for the whole timer >> to finish. The picture attached describes this idea. >> >> And the size of this buffer can be configured. I'm not counting on >> average users to configure it well, I'm just saying that we'd better not >> focus on absolute numbers of PoC or specific cases since we can always >> provide a conservative default value to make this acceptable for most >> cases.The adaptive buffer size is also worth a try if we provide a >> conservative strategy. >> >> Besides, I don't understand why the load pattern or spikes or >> backpressure will affect this. We are controlling the records that can get >> in the 'processElement' and the state requests that can fire in parallel, >> no matter how high the load spikes, they will be blocked outside. It is >> relatively stable within the proposed execution model itself. The unaligned >> barrier will skip those inputs in the queue as before. >> >> >> At the same time, I still don't understand why we can not implement things >>> incrementally? First >>> let's start with the current API, without the need to rewrite all of the >>> operators, we can asynchronously fetch whole >>> state for a given record using its key. That should already vastly >>> improve >>> many things, and this way we could >>> perform a checkpoint without a need of draining the in-progress/in-flight >>> buffer. We could roll that version out, >>> test it out in practice, and then we could see if the fine grained state >>> access is really needed. Otherwise it sounds >>> to me like a premature optimization, that requires us to not only >>> rewrite a >>> lot of the code, but also to later maintain >>> it, even if it ultimately proves to be not needed. Which I of course can >>> not be certain but I have a feeling that it >>> might be the case. >> >> >> The disaggregated state management we proposed is target at including but >> not limited to the following challenges: >> >> 1. Local disk constraints, including limited I/O and space. >> (discussed in FLIP-423) >> 2. Unbind the I/O resource with pre-allocated CPU resource, to make >> good use of both (motivation of FLIP-424) >> 3. Elasticity of scaling I/O or storage capacity. >> >> Thus our plan is dependent on DFS, using local disk as an optional cache >> only. The pre-fetching plan you mentioned is still binding I/O with CPU >> resources, and will consume even more I/O to load unnecessary state. It >> makes things worse. Please note that we are not targeting some scenarios >> where the local state could handle well, and our goal is not to replace the >> local state. >> >> And If manpower is a big concern of yours, I would say many of my >> colleagues could help contribute in runtime or SQL operators. It is >> experimental on a separate code path other than the local state and will be >> recommended to use only when we prove it mature. >> >> >> Thanks & Best, >> Zakelly >> >> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hey Zakelly! >>> >>> Sorry for the late reply. I still have concerns about the proposed >>> solution, with my main concerns coming from >>> the implications of the asynchronous state access API on the >>> checkpointing >>> and responsiveness of Flink. >>> >>> >> What also worries me a lot in this fine grained model is the effect on >>> the checkpointing times. >>> > >>> > Your concerns are very reasonable. Faster checkpointing is always a >>> core >>> advantage of disaggregated state, >>> > but only for the async phase. There will be some complexity introduced >>> by >>> in-flight requests, but I'd suggest >>> > a checkpoint containing those in-flight state requests as part of the >>> state, to accelerate the sync phase by >>> > skipping the buffer draining. This makes the buffer size have little >>> impact on checkpoint time. And all the >>> > changes keep within the execution model we proposed while the >>> checkpoint >>> barrier alignment or handling >>> > will not be touched in our proposal, so I guess the complexity is >>> relatively controllable. I have faith in that :) >>> >>> As we discussed off-line, you agreed that we can not checkpoint while >>> some >>> records are in the middle of being >>> processed. That we would have to drain the in-progress records before >>> doing >>> the checkpoint. You also argued >>> that this is not a problem, because the size of this buffer can be >>> configured. >>> >>> I'm really afraid of such a solution. I've seen in the past plenty of >>> times, that whenever Flink has to drain some >>> buffered records, eventually that always brakes timely checkpointing (and >>> hence ability for Flink to rescale in >>> a timely manner). Even a single record with a `flatMap` like operator >>> currently in Flink causes problems during >>> back pressure. That's especially true for example for processing >>> watermarks. At the same time, I don't see how >>> this value could be configured by even Flink's power users, let alone an >>> average user. The size of that in-flight >>> buffer not only depends on a particular query/job, but also the "good" >>> value changes dynamically over time, >>> and can change very rapidly. Sudden spikes of records or backpressure, >>> some >>> hiccup during emitting watermarks, >>> all of those could change in an instant the theoretically optimal buffer >>> size of let's say "6000" records, down to "1". >>> And when those changes happen, those are the exact times when timely >>> checkpointing matters the most. >>> If the load pattern suddenly changes, and checkpointing takes suddenly >>> tens >>> of minutes instead of a couple of >>> seconds, it means you can not use rescaling and you are forced to >>> overprovision the resources. And there also >>> other issues if checkpointing takes too long. >>> >>> At the same time, I still don't understand why we can not implement >>> things >>> incrementally? First >>> let's start with the current API, without the need to rewrite all of the >>> operators, we can asynchronously fetch whole >>> state for a given record using its key. That should already vastly >>> improve >>> many things, and this way we could >>> perform a checkpoint without a need of draining the in-progress/in-flight >>> buffer. We could roll that version out, >>> test it out in practice, and then we could see if the fine grained state >>> access is really needed. Otherwise it sounds >>> to me like a premature optimization, that requires us to not only >>> rewrite a >>> lot of the code, but also to later maintain >>> it, even if it ultimately proves to be not needed. Which I of course can >>> not be certain but I have a feeling that it >>> might be the case. >>> >>> Best, >>> Piotrek >>> >>> wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com> napisał(a): >>> >>> > Hi everyone, >>> > >>> > Thanks for your valuable feedback! >>> > >>> > Our discussions have been going on for a while and are nearing a >>> > consensus. So I would like to start a vote after 72 hours. >>> > >>> > Please let me know if you have any concerns, thanks! >>> > >>> > >>> > Best, >>> > Zakelly >>> > >>> > On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com> >>> wrote: >>> > >>> > > Hi Yunfeng, >>> > > >>> > > Thanks for the suggestion! >>> > > >>> > > I will reorganize the FLIP-425 accordingly. >>> > > >>> > > >>> > > Best, >>> > > Zakelly >>> > > >>> > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou < >>> > flink.zhouyunf...@gmail.com> >>> > > wrote: >>> > > >>> > >> Hi Xintong and Zakelly, >>> > >> >>> > >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks >>> > >> I agree with it that watermarks can use only out-of-order mode for >>> > >> now, because there is still not a concrete example showing the >>> > >> correctness risk about it. However, the strictly-ordered mode should >>> > >> still be supported as the default option for non-record event types >>> > >> other than watermark, at least for checkpoint barriers. >>> > >> >>> > >> I noticed that this information has already been documented in "For >>> > >> other non-record events, such as RecordAttributes ...", but it's at >>> > >> the bottom of the "Watermark" section, which might not be very >>> > >> obvious. Thus it might be better to reorganize the FLIP to better >>> > >> claim that the two order modes are designed for all non-record >>> events, >>> > >> and which mode this FLIP would choose for each type of event. >>> > >> >>> > >> Best, >>> > >> Yunfeng >>> > >> >>> > >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com >>> > >>> > >> wrote: >>> > >> > >>> > >> > Thanks for the quick response. Sounds good to me. >>> > >> > >>> > >> > Best, >>> > >> > >>> > >> > Xintong >>> > >> > >>> > >> > >>> > >> > >>> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan < >>> zakelly....@gmail.com> >>> > >> wrote: >>> > >> > >>> > >> > > Hi Xintong, >>> > >> > > >>> > >> > > Thanks for sharing your thoughts! >>> > >> > > >>> > >> > > 1. Regarding Record-ordered and State-ordered of processElement. >>> > >> > > > >>> > >> > > > I understand that while State-ordered likely provides better >>> > >> performance, >>> > >> > > > Record-ordered is sometimes required for correctness. The >>> question >>> > >> is how >>> > >> > > > should a user choose between these two modes? My concern is >>> that >>> > >> such a >>> > >> > > > decision may require users to have in-depth knowledge about >>> the >>> > >> Flink >>> > >> > > > internals, and may lead to correctness issues if >>> State-ordered is >>> > >> chosen >>> > >> > > > improperly. >>> > >> > > > >>> > >> > > > I'd suggest not to expose such a knob, at least in the first >>> > >> version. >>> > >> > > That >>> > >> > > > means always use Record-ordered for custom operators / UDFs, >>> and >>> > >> keep >>> > >> > > > State-ordered for internal usages (built-in operators) only. >>> > >> > > > >>> > >> > > >>> > >> > > Indeed, users may not be able to choose the mode properly. I >>> agree >>> > to >>> > >> keep >>> > >> > > such options for internal use. >>> > >> > > >>> > >> > > >>> > >> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. >>> > >> > > > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the >>> default, or >>> > >> even >>> > >> > > > being supported. From my understanding, a Watermark(T) only >>> > >> suggests that >>> > >> > > > all records with event time before T has arrived, and it has >>> > >> nothing to >>> > >> > > do >>> > >> > > > with whether records with event time after T has arrived or >>> not. >>> > >> From >>> > >> > > that >>> > >> > > > perspective, preventing certain records from arriving before a >>> > >> Watermark >>> > >> > > is >>> > >> > > > never supported. I also cannot come up with any use case where >>> > >> > > > Strictly-ordered is necessary. This implies the same issue as >>> 1): >>> > >> how >>> > >> > > does >>> > >> > > > the user choose between the two modes? >>> > >> > > > >>> > >> > > > I'd suggest not expose the knob to users and only support >>> > >> Out-of-order, >>> > >> > > > until we see a concrete use case that Strictly-ordered is >>> needed. >>> > >> > > > >>> > >> > > >>> > >> > > The semantics of watermarks do not define the sequence between a >>> > >> watermark >>> > >> > > and subsequent records. For the most part, this is >>> inconsequential, >>> > >> except >>> > >> > > it may affect some current users who have previously relied on >>> the >>> > >> implicit >>> > >> > > assumption of an ordered execution. I'd be fine with initially >>> > >> supporting >>> > >> > > only out-of-order processing. We may consider exposing the >>> > >> > > 'Strictly-ordered' mode once we encounter a concrete use case >>> that >>> > >> > > necessitates it. >>> > >> > > >>> > >> > > >>> > >> > > My philosophies behind not exposing the two config options are: >>> > >> > > > - There are already too many options in Flink that barely >>> know how >>> > >> to use >>> > >> > > > them. I think Flink should try as much as possible to decide >>> its >>> > own >>> > >> > > > behavior, rather than throwing all the decisions to the users. >>> > >> > > > - It's much harder to take back knobs than to introduce them. >>> > >> Therefore, >>> > >> > > > options should be introduced only if concrete use cases are >>> > >> identified. >>> > >> > > > >>> > >> > > >>> > >> > > I agree to keep minimal configurable items especially for the >>> MVP. >>> > >> Given >>> > >> > > that we have the opportunity to refine the functionality before >>> the >>> > >> > > framework transitions from @Experimental to @PublicEvolving, it >>> > makes >>> > >> sense >>> > >> > > to refrain from presenting user-facing options until we have >>> ensured >>> > >> > > their necessity. >>> > >> > > >>> > >> > > >>> > >> > > Best, >>> > >> > > Zakelly >>> > >> > > >>> > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song < >>> > tonysong...@gmail.com> >>> > >> > > wrote: >>> > >> > > >>> > >> > > > Sorry for joining the discussion late. >>> > >> > > > >>> > >> > > > I have two questions about FLIP-425. >>> > >> > > > >>> > >> > > > 1. Regarding Record-ordered and State-ordered of >>> processElement. >>> > >> > > > >>> > >> > > > I understand that while State-ordered likely provides better >>> > >> performance, >>> > >> > > > Record-ordered is sometimes required for correctness. The >>> question >>> > >> is how >>> > >> > > > should a user choose between these two modes? My concern is >>> that >>> > >> such a >>> > >> > > > decision may require users to have in-depth knowledge about >>> the >>> > >> Flink >>> > >> > > > internals, and may lead to correctness issues if >>> State-ordered is >>> > >> chosen >>> > >> > > > improperly. >>> > >> > > > >>> > >> > > > I'd suggest not to expose such a knob, at least in the first >>> > >> version. >>> > >> > > That >>> > >> > > > means always use Record-ordered for custom operators / UDFs, >>> and >>> > >> keep >>> > >> > > > State-ordered for internal usages (built-in operators) only. >>> > >> > > > >>> > >> > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. >>> > >> > > > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the >>> default, or >>> > >> even >>> > >> > > > being supported. From my understanding, a Watermark(T) only >>> > >> suggests that >>> > >> > > > all records with event time before T has arrived, and it has >>> > >> nothing to >>> > >> > > do >>> > >> > > > with whether records with event time after T has arrived or >>> not. >>> > >> From >>> > >> > > that >>> > >> > > > perspective, preventing certain records from arriving before a >>> > >> Watermark >>> > >> > > is >>> > >> > > > never supported. I also cannot come up with any use case where >>> > >> > > > Strictly-ordered is necessary. This implies the same issue as >>> 1): >>> > >> how >>> > >> > > does >>> > >> > > > the user choose between the two modes? >>> > >> > > > >>> > >> > > > I'd suggest not expose the knob to users and only support >>> > >> Out-of-order, >>> > >> > > > until we see a concrete use case that Strictly-ordered is >>> needed. >>> > >> > > > >>> > >> > > > >>> > >> > > > My philosophies behind not exposing the two config options >>> are: >>> > >> > > > - There are already too many options in Flink that barely >>> know how >>> > >> to use >>> > >> > > > them. I think Flink should try as much as possible to decide >>> its >>> > own >>> > >> > > > behavior, rather than throwing all the decisions to the users. >>> > >> > > > - It's much harder to take back knobs than to introduce them. >>> > >> Therefore, >>> > >> > > > options should be introduced only if concrete use cases are >>> > >> identified. >>> > >> > > > >>> > >> > > > WDYT? >>> > >> > > > >>> > >> > > > Best, >>> > >> > > > >>> > >> > > > Xintong >>> > >> > > > >>> > >> > > > >>> > >> > > > >>> > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge >>> <j...@ververica.com.invalid >>> > > >>> > >> > > wrote: >>> > >> > > > >>> > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which >>> > >> introduced >>> > >> > > the >>> > >> > > > > intention of the big change and high level architecture. >>> Great >>> > >> content >>> > >> > > > btw! >>> > >> > > > > The only public interface change for this FLIP is one new >>> config >>> > >> to use >>> > >> > > > > ForSt. It makes sense to have one dedicated discussion >>> thread >>> > for >>> > >> each >>> > >> > > > > concrete system design. >>> > >> > > > > >>> > >> > > > > @Zakelly The links in your mail do not work except the last >>> one, >>> > >> > > because >>> > >> > > > > the FLIP-xxx has been included into the url like >>> > >> > > > > >>> > >> > > >>> > >> >>> > >>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 >>> > >> > > > . >>> > >> > > > > >>> > >> > > > > NIT fix: >>> > >> > > > > >>> > >> > > > > FLIP-424: >>> > >> > > > >>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 >>> > >> > > > > >>> > >> > > > > FLIP-425: >>> > >> > > > >>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h >>> > >> > > > > >>> > >> > > > > FLIP-426: >>> > >> > > > >>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf >>> > >> > > > > >>> > >> > > > > FLIP-427: >>> > >> > > > >>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft >>> > >> > > > > >>> > >> > > > > FLIP-428: >>> > >> > > > >>> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b >>> > >> > > > > >>> > >> > > > > Best regards, >>> > >> > > > > Jing >>> > >> > > > > >>> > >> > > > > >>> > >> > > > > >>> > >> > > > > >>> > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan < >>> > >> zakelly....@gmail.com> >>> > >> > > > wrote: >>> > >> > > > > >>> > >> > > > > > Hi everyone, >>> > >> > > > > > >>> > >> > > > > > Thank you all for a lively discussion here, and it is a >>> good >>> > >> time to >>> > >> > > > move >>> > >> > > > > > forward to more detailed discussions. Thus we open several >>> > >> threads >>> > >> > > for >>> > >> > > > > > sub-FLIPs: >>> > >> > > > > > >>> > >> > > > > > FLIP-424: >>> > >> > > > > >>> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 >>> > >> > > > > > FLIP-425 >>> > >> > > > > > < >>> > >> > > > > >>> > >> > > >>> > >> >>> > >>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 >>> > >> > > > >: >>> > >> > > > > > >>> > >> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h >>> > >> > > > > > FLIP-426 >>> > >> > > > > > < >>> > >> > > > > >>> > >> > > >>> > >> >>> > >>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426 >>> > >> > > > >: >>> > >> > > > > > >>> > >> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf >>> > >> > > > > > FLIP-427 >>> > >> > > > > > < >>> > >> > > > > >>> > >> > > >>> > >> >>> > >>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427 >>> > >> > > > >: >>> > >> > > > > > >>> > >> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft >>> > >> > > > > > FLIP-428 >>> > >> > > > > > < >>> > >> > > > > >>> > >> > > >>> > >> >>> > >>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428 >>> > >> > > > >: >>> > >> > > > > > >>> > >> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b >>> > >> > > > > > >>> > >> > > > > > If you want to talk about the overall architecture, >>> roadmap, >>> > >> > > milestones >>> > >> > > > > or >>> > >> > > > > > something related with multiple FLIPs, please post it >>> here. >>> > >> Otherwise >>> > >> > > > you >>> > >> > > > > > can discuss some details in separate mails. Let's try to >>> avoid >>> > >> > > repeated >>> > >> > > > > > discussion in different threads. I will sync important >>> > messages >>> > >> here >>> > >> > > if >>> > >> > > > > > there are any in the above threads. >>> > >> > > > > > >>> > >> > > > > > And reply to @Jeyhun: We will ensure the content between >>> those >>> > >> FLIPs >>> > >> > > is >>> > >> > > > > > consistent. >>> > >> > > > > > >>> > >> > > > > > >>> > >> > > > > > Best, >>> > >> > > > > > Zakelly >>> > >> > > > > > >>> > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei < >>> > yuanmei.w...@gmail.com >>> > >> > >>> > >> > > > wrote: >>> > >> > > > > > >>> > >> > > > > > > I have been a bit busy these few weeks and sorry for >>> > >> responding >>> > >> > > late. >>> > >> > > > > > > >>> > >> > > > > > > The original thinking of keeping discussion within one >>> > thread >>> > >> is >>> > >> > > for >>> > >> > > > > > easier >>> > >> > > > > > > tracking and avoid for repeated discussion in different >>> > >> threads. >>> > >> > > > > > > >>> > >> > > > > > > For details, It might be good to start in different >>> threads >>> > if >>> > >> > > > needed. >>> > >> > > > > > > >>> > >> > > > > > > We will think of a way to better organize the >>> discussion. >>> > >> > > > > > > >>> > >> > > > > > > Best >>> > >> > > > > > > Yuan >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov < >>> > >> > > je.kari...@gmail.com> >>> > >> > > > > > > wrote: >>> > >> > > > > > > >>> > >> > > > > > > > Hi, >>> > >> > > > > > > > >>> > >> > > > > > > > + 1 for the suggestion. >>> > >> > > > > > > > Maybe we can the discussion with the FLIPs with >>> minimum >>> > >> > > > dependencies >>> > >> > > > > > > (from >>> > >> > > > > > > > the other new/proposed FLIPs). >>> > >> > > > > > > > Based on our discussion on a particular FLIP, the >>> > >> subsequent (or >>> > >> > > > its >>> > >> > > > > > > > dependent) FLIP(s) can be updated accordingly? >>> > >> > > > > > > > >>> > >> > > > > > > > Regards, >>> > >> > > > > > > > Jeyhun >>> > >> > > > > > > > >>> > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra < >>> > >> gyula.f...@gmail.com> >>> > >> > > > > > wrote: >>> > >> > > > > > > > >>> > >> > > > > > > > > Hey all! >>> > >> > > > > > > > > >>> > >> > > > > > > > > This is a massive improvement / work. I just started >>> > going >>> > >> > > > through >>> > >> > > > > > the >>> > >> > > > > > > > > Flips and have a more or less meta comment. >>> > >> > > > > > > > > >>> > >> > > > > > > > > While it's good to keep the overall architecture >>> > >> discussion >>> > >> > > > here, I >>> > >> > > > > > > think >>> > >> > > > > > > > > we should still have separate discussions for each >>> FLIP >>> > >> where >>> > >> > > we >>> > >> > > > > can >>> > >> > > > > > > > > discuss interface details etc. With so much content >>> if >>> > we >>> > >> start >>> > >> > > > > > adding >>> > >> > > > > > > > > minor comments here that will lead to nowhere but >>> those >>> > >> > > > discussions >>> > >> > > > > > are >>> > >> > > > > > > > > still important and we should have them in separate >>> > >> threads >>> > >> > > (one >>> > >> > > > > for >>> > >> > > > > > > each >>> > >> > > > > > > > > FLIP) >>> > >> > > > > > > > > >>> > >> > > > > > > > > What do you think? >>> > >> > > > > > > > > Gyula >>> > >> > > > > > > > > >>> > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei < >>> > >> fredia...@gmail.com >>> > >> > > > >>> > >> > > > > > wrote: >>> > >> > > > > > > > > >>> > >> > > > > > > > > > Hi team, >>> > >> > > > > > > > > > >>> > >> > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we >>> > have >>> > >> > > > > > supplemented >>> > >> > > > > > > > > > several updates to answer high-frequency >>> questions: >>> > >> > > > > > > > > > >>> > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap state >>> > >> backend in >>> > >> > > > > > > > > > "Synchronous execution with asynchronous APIs"[1], >>> > which >>> > >> > > > reveals >>> > >> > > > > > that >>> > >> > > > > > > > > > the framework overhead (including reference >>> counting, >>> > >> > > > > > future-related >>> > >> > > > > > > > > > code and so on) consumes about 9% of the keyed >>> > operator >>> > >> CPU >>> > >> > > > time. >>> > >> > > > > > > > > > 2. We added a set of comparative experiments for >>> > >> watermark >>> > >> > > > > > > processing, >>> > >> > > > > > > > > > the performance of Out-Of-Order mode is 70% better >>> > than >>> > >> > > > > > > > > > strictly-ordered mode under ~140MB state size. >>> > >> Instructions >>> > >> > > on >>> > >> > > > > how >>> > >> > > > > > to >>> > >> > > > > > > > > > run this test have also been added[2]. >>> > >> > > > > > > > > > 3. Regarding the order of StreamRecord, whether >>> it has >>> > >> state >>> > >> > > > > access >>> > >> > > > > > > or >>> > >> > > > > > > > > > not. We supplemented a new *Strict order of >>> > >> > > > 'processElement'*[3]. >>> > >> > > > > > > > > > >>> > >> > > > > > > > > > [1] >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs >>> > >> > > > > > > > > > [2] >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing >>> > >> > > > > > > > > > [3] >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder >>> > >> > > > > > > > > > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > > Best regards, >>> > >> > > > > > > > > > Yanfei >>> > >> > > > > > > > > > >>> > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> >>> > 于2024年3月5日周二 >>> > >> > > > 09:25写道: >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > Hi Zakelly, >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this later >>> > >> since it >>> > >> > > is >>> > >> > > > > not >>> > >> > > > > > > > > > important. >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > It seems that we still have some details to >>> confirm >>> > >> about >>> > >> > > > this >>> > >> > > > > > > > > > > question. Let's postpone this to after the >>> critical >>> > >> parts >>> > >> > > of >>> > >> > > > > the >>> > >> > > > > > > > > > > design are settled. >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics should >>> be >>> > like >>> > >> > > > > > afterwards. >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with >>> discussing >>> > >> this >>> > >> > > > topic >>> > >> > > > > > in >>> > >> > > > > > > > > > milestone 2. >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > Looking forward to the detailed design about the >>> > >> strict >>> > >> > > mode >>> > >> > > > > > > between >>> > >> > > > > > > > > > > same-key records and the benchmark results >>> about the >>> > >> epoch >>> > >> > > > > > > mechanism. >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > Best regards, >>> > >> > > > > > > > > > > Yunfeng >>> > >> > > > > > > > > > > >>> > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan < >>> > >> > > > > > zakelly....@gmail.com> >>> > >> > > > > > > > > > wrote: >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > Hi Yunfeng, >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 1: >>> > >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I >>> realize >>> > >> it is >>> > >> > > a >>> > >> > > > > > common >>> > >> > > > > > > > > case >>> > >> > > > > > > > > > the same-key record should be blocked before the >>> > >> > > > > `processElement`. >>> > >> > > > > > It >>> > >> > > > > > > > is >>> > >> > > > > > > > > > easier for users to understand. Thus I will >>> introduce >>> > a >>> > >> > > strict >>> > >> > > > > mode >>> > >> > > > > > > for >>> > >> > > > > > > > > > this and make it default. My rough idea is just >>> like >>> > >> yours, >>> > >> > > by >>> > >> > > > > > > invoking >>> > >> > > > > > > > > > some method of AEC instance before >>> `processElement`. >>> > The >>> > >> > > > detailed >>> > >> > > > > > > > design >>> > >> > > > > > > > > > will be described in FLIP later. >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 2: >>> > >> > > > > > > > > > > > I agree with you. We could throw exceptions >>> for >>> > now >>> > >> and >>> > >> > > > > > optimize >>> > >> > > > > > > > this >>> > >> > > > > > > > > > later. >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 5: >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> It might be better to move the default >>> values to >>> > >> the >>> > >> > > > > Proposed >>> > >> > > > > > > > > Changes >>> > >> > > > > > > > > > > >> section instead of making them public for >>> now, as >>> > >> there >>> > >> > > > will >>> > >> > > > > > be >>> > >> > > > > > > > > > > >> compatibility issues once we want to >>> dynamically >>> > >> adjust >>> > >> > > > the >>> > >> > > > > > > > > thresholds >>> > >> > > > > > > > > > > >> and timeouts in future. >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > Agreed. The whole framework is under >>> experiment >>> > >> until we >>> > >> > > > > think >>> > >> > > > > > it >>> > >> > > > > > > > is >>> > >> > > > > > > > > > complete in 2.0 or later. The default value >>> should be >>> > >> better >>> > >> > > > > > > determined >>> > >> > > > > > > > > > with more testing results and production >>> experience. >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > >> The configuration >>> execution.async-state.enabled >>> > >> seems >>> > >> > > > > > > unnecessary, >>> > >> > > > > > > > > as >>> > >> > > > > > > > > > > >> the infrastructure may automatically get this >>> > >> > > information >>> > >> > > > > from >>> > >> > > > > > > the >>> > >> > > > > > > > > > > >> detailed state backend configurations. We may >>> > >> revisit >>> > >> > > this >>> > >> > > > > > part >>> > >> > > > > > > > > after >>> > >> > > > > > > > > > > >> the core designs have reached an agreement. >>> WDYT? >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > I'm not very sure if there is any use case >>> where >>> > >> users >>> > >> > > > write >>> > >> > > > > > > their >>> > >> > > > > > > > > > code using async APIs but run their job in a >>> > >> synchronous way. >>> > >> > > > The >>> > >> > > > > > > first >>> > >> > > > > > > > > two >>> > >> > > > > > > > > > scenarios that come to me are for benchmarking or >>> for >>> > a >>> > >> small >>> > >> > > > > > state, >>> > >> > > > > > > > > while >>> > >> > > > > > > > > > they don't want to rewrite their code. Actually >>> it is >>> > >> easy to >>> > >> > > > > > > support, >>> > >> > > > > > > > so >>> > >> > > > > > > > > > I'd suggest providing it. But I'm fine with >>> revisiting >>> > >> this >>> > >> > > > later >>> > >> > > > > > > since >>> > >> > > > > > > > > it >>> > >> > > > > > > > > > is not important. WDYT? >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > For 8: >>> > >> > > > > > > > > > > > Yes, we had considered the I/O metrics group >>> > >> especially >>> > >> > > the >>> > >> > > > > > > > > > back-pressure, idle and task busy per second. In >>> the >>> > >> current >>> > >> > > > plan >>> > >> > > > > > we >>> > >> > > > > > > > can >>> > >> > > > > > > > > do >>> > >> > > > > > > > > > state access during back-pressure, meaning that >>> those >>> > >> metrics >>> > >> > > > for >>> > >> > > > > > > input >>> > >> > > > > > > > > > would better be redefined. I suggest we discuss >>> these >>> > >> > > existing >>> > >> > > > > > > metrics >>> > >> > > > > > > > as >>> > >> > > > > > > > > > well as some new metrics that should be >>> introduced in >>> > >> > > FLIP-431 >>> > >> > > > > > later >>> > >> > > > > > > in >>> > >> > > > > > > > > > milestone 2, since we have basically finished the >>> > >> framework >>> > >> > > > thus >>> > >> > > > > we >>> > >> > > > > > > > will >>> > >> > > > > > > > > > have a better view of what metrics should be like >>> > >> afterwards. >>> > >> > > > > WDYT? >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > Best, >>> > >> > > > > > > > > > > > Zakelly >>> > >> > > > > > > > > > > > >>> > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou < >>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote: >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Hi Zakelly, >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Thanks for the responses! >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> > 1. I will discuss this with some expert SQL >>> > >> > > developers. >>> > >> > > > > ... >>> > >> > > > > > > mode >>> > >> > > > > > > > > > for StreamRecord processing. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> In DataStream API there should also be use >>> cases >>> > >> when >>> > >> > > the >>> > >> > > > > > order >>> > >> > > > > > > of >>> > >> > > > > > > > > > > >> output is strictly required. I agree with it >>> that >>> > >> SQL >>> > >> > > > > experts >>> > >> > > > > > > may >>> > >> > > > > > > > > help >>> > >> > > > > > > > > > > >> provide more concrete use cases that can >>> > >> accelerate our >>> > >> > > > > > > > discussion, >>> > >> > > > > > > > > > > >> but please allow me to search for DataStream >>> use >>> > >> cases >>> > >> > > > that >>> > >> > > > > > can >>> > >> > > > > > > > > prove >>> > >> > > > > > > > > > > >> the necessity of this strict order >>> preservation >>> > >> mode, if >>> > >> > > > > > answers >>> > >> > > > > > > > > from >>> > >> > > > > > > > > > > >> SQL experts are shown to be negative. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> For your convenience, my current rough idea >>> is >>> > >> that we >>> > >> > > can >>> > >> > > > > > add a >>> > >> > > > > > > > > > > >> module between the Input(s) and >>> processElement() >>> > >> module >>> > >> > > in >>> > >> > > > > > Fig 2 >>> > >> > > > > > > > of >>> > >> > > > > > > > > > > >> FLIP-425. The module will be responsible for >>> > >> caching >>> > >> > > > records >>> > >> > > > > > > whose >>> > >> > > > > > > > > > > >> keys collide with in-flight records, and AEC >>> will >>> > >> only >>> > >> > > be >>> > >> > > > > > > > > responsible >>> > >> > > > > > > > > > > >> for handling async state calls, without >>> knowing >>> > the >>> > >> > > record >>> > >> > > > > > each >>> > >> > > > > > > > call >>> > >> > > > > > > > > > > >> belongs to. We may revisit this topic once >>> the >>> > >> necessity >>> > >> > > > of >>> > >> > > > > > the >>> > >> > > > > > > > > strict >>> > >> > > > > > > > > > > >> order mode is clarified. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests ... >>> > >> instead of >>> > >> > > > > > > invoking >>> > >> > > > > > > > > > yield >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Your suggestions generally appeal to me. I >>> think >>> > >> we may >>> > >> > > > let >>> > >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM for >>> now, >>> > >> since >>> > >> > > the >>> > >> > > > > > > majority >>> > >> > > > > > > > > of >>> > >> > > > > > > > > > > >> a StateRequest should just be references to >>> > >> existing >>> > >> > > Java >>> > >> > > > > > > objects, >>> > >> > > > > > > > > > > >> which only occupies very small memory space >>> and >>> > can >>> > >> > > hardly >>> > >> > > > > > cause >>> > >> > > > > > > > OOM >>> > >> > > > > > > > > > > >> in common cases. We can monitor the pending >>> > >> > > StateRequests >>> > >> > > > > and >>> > >> > > > > > if >>> > >> > > > > > > > > there >>> > >> > > > > > > > > > > >> is really a risk of OOM in extreme cases, we >>> can >>> > >> throw >>> > >> > > > > > > Exceptions >>> > >> > > > > > > > > with >>> > >> > > > > > > > > > > >> proper messages notifying users what to do, >>> like >>> > >> > > > increasing >>> > >> > > > > > > memory >>> > >> > > > > > > > > > > >> through configurations. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Your suggestions to adjust threshold >>> adaptively >>> > or >>> > >> to >>> > >> > > use >>> > >> > > > > the >>> > >> > > > > > > > > blocking >>> > >> > > > > > > > > > > >> buffer sounds good, and in my opinion we can >>> > >> postpone >>> > >> > > them >>> > >> > > > > to >>> > >> > > > > > > > future >>> > >> > > > > > > > > > > >> FLIPs since they seem to only benefit users >>> in >>> > rare >>> > >> > > cases. >>> > >> > > > > > Given >>> > >> > > > > > > > > that >>> > >> > > > > > > > > > > >> FLIP-423~428 has already been a big enough >>> > design, >>> > >> it >>> > >> > > > might >>> > >> > > > > be >>> > >> > > > > > > > > better >>> > >> > > > > > > > > > > >> to focus on the most critical design for now >>> and >>> > >> > > postpone >>> > >> > > > > > > > > > > >> optimizations like this. WDYT? >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs as >>> well >>> > as >>> > >> > > their >>> > >> > > > > > > default >>> > >> > > > > > > > > > value. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Thanks for adding the default values and the >>> > values >>> > >> > > > > themselves >>> > >> > > > > > > > LGTM. >>> > >> > > > > > > > > > > >> It might be better to move the default >>> values to >>> > >> the >>> > >> > > > > Proposed >>> > >> > > > > > > > > Changes >>> > >> > > > > > > > > > > >> section instead of making them public for >>> now, as >>> > >> there >>> > >> > > > will >>> > >> > > > > > be >>> > >> > > > > > > > > > > >> compatibility issues once we want to >>> dynamically >>> > >> adjust >>> > >> > > > the >>> > >> > > > > > > > > thresholds >>> > >> > > > > > > > > > > >> and timeouts in future. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> The configuration >>> execution.async-state.enabled >>> > >> seems >>> > >> > > > > > > unnecessary, >>> > >> > > > > > > > > as >>> > >> > > > > > > > > > > >> the infrastructure may automatically get this >>> > >> > > information >>> > >> > > > > from >>> > >> > > > > > > the >>> > >> > > > > > > > > > > >> detailed state backend configurations. We may >>> > >> revisit >>> > >> > > this >>> > >> > > > > > part >>> > >> > > > > > > > > after >>> > >> > > > > > > > > > > >> the core designs have reached an agreement. >>> WDYT? >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it >>> comes >>> > >> to me >>> > >> > > > that >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics that >>> > measure >>> > >> the >>> > >> > > > time >>> > >> > > > > a >>> > >> > > > > > > > Flink >>> > >> > > > > > > > > > > >> job is back-pressured by State IOs? Under the >>> > >> current >>> > >> > > > > design, >>> > >> > > > > > > this >>> > >> > > > > > > > > > > >> metric could measure the time when the >>> blocking >>> > >> buffer >>> > >> > > is >>> > >> > > > > full >>> > >> > > > > > > and >>> > >> > > > > > > > > > > >> yield() cannot get callbacks to process, >>> which >>> > >> means the >>> > >> > > > > > > operator >>> > >> > > > > > > > is >>> > >> > > > > > > > > > > >> fully waiting for state responses. >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> Best regards, >>> > >> > > > > > > > > > > >> Yunfeng >>> > >> > > > > > > > > > > >> >>> > >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan < >>> > >> > > > > > > > zakelly....@gmail.com> >>> > >> > > > > > > > > > wrote: >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Hi Yunfeng, >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Thanks for your detailed comments! >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on >>> > >> StateIterator? >>> > >> > > > This >>> > >> > > > > > > > method >>> > >> > > > > > > > > > seems >>> > >> > > > > > > > > > > >> >> unused in the usage example codes. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > The `close()` is introduced to release >>> internal >>> > >> > > > resources, >>> > >> > > > > > but >>> > >> > > > > > > > it >>> > >> > > > > > > > > > does not seem to require the user to call it. I >>> > removed >>> > >> this. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, >>> it >>> > is >>> > >> > > stated >>> > >> > > > > that >>> > >> > > > > > > "No >>> > >> > > > > > > > > > null >>> > >> > > > > > > > > > > >> >> entries are allowed". It might be better >>> to >>> > >> further >>> > >> > > > > explain >>> > >> > > > > > > > what >>> > >> > > > > > > > > > will >>> > >> > > > > > > > > > > >> >> happen if a null value is passed, >>> ignoring the >>> > >> value >>> > >> > > in >>> > >> > > > > the >>> > >> > > > > > > > > > returned >>> > >> > > > > > > > > > > >> >> Collection or throwing exceptions. Given >>> that >>> > >> > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned >>> in >>> > the >>> > >> > > > example >>> > >> > > > > > > code, >>> > >> > > > > > > > I >>> > >> > > > > > > > > > > >> >> suppose the former one might be correct. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > The statement "No null entries are allowed" >>> > >> refers to >>> > >> > > > the >>> > >> > > > > > > > > > parameters, it means some arrayList like [null, >>> > >> StateFuture1, >>> > >> > > > > > > > > StateFuture2] >>> > >> > > > > > > > > > passed in are not allowed, and an Exception will >>> be >>> > >> thrown. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . >>> This >>> > >> > > > situation >>> > >> > > > > > > should >>> > >> > > > > > > > > be >>> > >> > > > > > > > > > > >> >> avoided and the order of same-key records >>> > >> should be >>> > >> > > > > > strictly >>> > >> > > > > > > > > > > >> >> preserved. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > I will discuss this with some expert SQL >>> > >> developers. >>> > >> > > And >>> > >> > > > > if >>> > >> > > > > > it >>> > >> > > > > > > > is >>> > >> > > > > > > > > > valid and common, I suggest a strict order >>> > preservation >>> > >> mode >>> > >> > > > for >>> > >> > > > > > > > > > StreamRecord processing. WDYT? >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests >>> submitted >>> > by >>> > >> > > > > Callbacks >>> > >> > > > > > > will >>> > >> > > > > > > > > not >>> > >> > > > > > > > > > > >> >> invoke further yield() methods. Given that >>> > >> yield() is >>> > >> > > > > used >>> > >> > > > > > > when >>> > >> > > > > > > > > > there >>> > >> > > > > > > > > > > >> >> is "too much" in-flight data, does it mean >>> > >> > > > StateRequests >>> > >> > > > > > > > > submitted >>> > >> > > > > > > > > > by >>> > >> > > > > > > > > > > >> >> Callbacks will never be "too much"? What >>> if >>> > the >>> > >> total >>> > >> > > > > > number >>> > >> > > > > > > of >>> > >> > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink >>> > >> operator's >>> > >> > > > > > memory >>> > >> > > > > > > > > space? >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > The amount of parallel StateRequests for >>> one >>> > >> > > > StreamRecord >>> > >> > > > > > > cannot >>> > >> > > > > > > > > be >>> > >> > > > > > > > > > determined since the code is written by users. So >>> the >>> > >> > > in-flight >>> > >> > > > > > > > requests >>> > >> > > > > > > > > > may be "too much", and may cause OOM. Users should >>> > >> > > re-configure >>> > >> > > > > > their >>> > >> > > > > > > > > job, >>> > >> > > > > > > > > > controlling the amount of on-going StreamRecord. >>> And I >>> > >> > > suggest >>> > >> > > > > two >>> > >> > > > > > > ways >>> > >> > > > > > > > > to >>> > >> > > > > > > > > > avoid this: >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Adaptively adjust the count of on-going >>> > >> StreamRecord >>> > >> > > > > > according >>> > >> > > > > > > > to >>> > >> > > > > > > > > > historical StateRequests amount. >>> > >> > > > > > > > > > > >> > Also control the max StateRequests that >>> can be >>> > >> > > executed >>> > >> > > > in >>> > >> > > > > > > > > parallel >>> > >> > > > > > > > > > for each StreamRecord, and if it exceeds, put the >>> new >>> > >> > > > > StateRequest >>> > >> > > > > > in >>> > >> > > > > > > > the >>> > >> > > > > > > > > > blocking buffer waiting for execution (instead of >>> > >> invoking >>> > >> > > > > > yield()). >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > WDYT? >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order >>> > >> execution >>> > >> > > mode, >>> > >> > > > > > along >>> > >> > > > > > > > > with >>> > >> > > > > > > > > > the >>> > >> > > > > > > > > > > >> >> epoch mechanism, would bring more >>> complexity >>> > to >>> > >> the >>> > >> > > > > > execution >>> > >> > > > > > > > > model >>> > >> > > > > > > > > > > >> >> than the performance improvement it >>> promises. >>> > >> Could >>> > >> > > we >>> > >> > > > > add >>> > >> > > > > > > some >>> > >> > > > > > > > > > > >> >> benchmark results proving the benefit of >>> this >>> > >> mode? >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Agreed, will do. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public >>> API >>> > >> section >>> > >> > > > > > > describing >>> > >> > > > > > > > > how >>> > >> > > > > > > > > > > >> >> users or developers can switch between >>> these >>> > two >>> > >> > > > > execution >>> > >> > > > > > > > modes. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Good point. We will add a Public API >>> section. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 3.3 Apart from the watermark and >>> checkpoint >>> > >> mentioned >>> > >> > > > in >>> > >> > > > > > this >>> > >> > > > > > > > > FLIP, >>> > >> > > > > > > > > > > >> >> there are also more other events that >>> might >>> > >> appear in >>> > >> > > > the >>> > >> > > > > > > > stream >>> > >> > > > > > > > > of >>> > >> > > > > > > > > > > >> >> data records. It might be better to >>> generalize >>> > >> the >>> > >> > > > > > execution >>> > >> > > > > > > > mode >>> > >> > > > > > > > > > > >> >> mechanism to handle all possible events. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the >>> > >> reminder. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 4. It might be better to treat >>> > >> callback-handling as a >>> > >> > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to >>> > >> avoid the >>> > >> > > > > > overhead >>> > >> > > > > > > > of >>> > >> > > > > > > > > > > >> >> repeatedly creating Mail objects. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > I thought the intermediated wrapper for >>> > >> callback can >>> > >> > > > not >>> > >> > > > > be >>> > >> > > > > > > > > > omitted, since there will be some context switch >>> > before >>> > >> each >>> > >> > > > > > > execution. >>> > >> > > > > > > > > The >>> > >> > > > > > > > > > MailboxDefaultAction in most cases is processInput >>> > >> right? >>> > >> > > While >>> > >> > > > > the >>> > >> > > > > > > > > > callback should be executed with higher priority. >>> I'd >>> > >> suggest >>> > >> > > > not >>> > >> > > > > > > > > changing >>> > >> > > > > > > > > > the basic logic of Mailbox and the default action >>> > since >>> > >> it is >>> > >> > > > > very >>> > >> > > > > > > > > critical >>> > >> > > > > > > > > > for performance. But yes, we will try our best to >>> > avoid >>> > >> > > > creating >>> > >> > > > > > > > > > intermediated objects. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 5. Could this FLIP provide the current >>> default >>> > >> values >>> > >> > > > for >>> > >> > > > > > > > things >>> > >> > > > > > > > > > like >>> > >> > > > > > > > > > > >> >> active buffer size thresholds and >>> timeouts? >>> > >> These >>> > >> > > could >>> > >> > > > > > help >>> > >> > > > > > > > with >>> > >> > > > > > > > > > > >> >> memory consumption and latency analysis. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Sure, we will introduce new configs as >>> well as >>> > >> their >>> > >> > > > > default >>> > >> > > > > > > > > value. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode >>> of a >>> > >> record >>> > >> > > in >>> > >> > > > > its >>> > >> > > > > > > > > > > >> >> RecordContext? It seems not used. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > The context switch before each callback >>> > execution >>> > >> > > > involves >>> > >> > > > > > > > > > setCurrentKey, where the hashCode is >>> re-calculated. We >>> > >> cache >>> > >> > > it >>> > >> > > > > for >>> > >> > > > > > > > > > accelerating. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM >>> heap or >>> > >> > > > RocksDB", >>> > >> > > > > > the >>> > >> > > > > > > > link >>> > >> > > > > > > > > > > >> >> points to a document in flink-1.15. It >>> might >>> > be >>> > >> > > better >>> > >> > > > to >>> > >> > > > > > > > verify >>> > >> > > > > > > > > > the >>> > >> > > > > > > > > > > >> >> referenced content is still valid in the >>> > latest >>> > >> Flink >>> > >> > > > and >>> > >> > > > > > > > update >>> > >> > > > > > > > > > the >>> > >> > > > > > > > > > > >> >> link accordingly. Same for other >>> references if >>> > >> any. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Thanks for the reminder! Will check. >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > Thanks a lot & Best, >>> > >> > > > > > > > > > > >> > Zakelly >>> > >> > > > > > > > > > > >> > >>> > >> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun >>> Karimov < >>> > >> > > > > > > > > je.kari...@gmail.com> >>> > >> > > > > > > > > > wrote: >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> Hi, >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> Thanks for the great proposals. I have a >>> few >>> > >> comments >>> > >> > > > > > > comments: >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> - Backpressure Handling. Flink's original >>> > >> > > backpressure >>> > >> > > > > > > handling >>> > >> > > > > > > > > is >>> > >> > > > > > > > > > quite >>> > >> > > > > > > > > > > >> >> robust and the semantics is quite "simple" >>> > >> (simple is >>> > >> > > > > > > > beautiful). >>> > >> > > > > > > > > > > >> >> This mechanism has proven to perform >>> > >> better/robust >>> > >> > > than >>> > >> > > > > the >>> > >> > > > > > > > other >>> > >> > > > > > > > > > open >>> > >> > > > > > > > > > > >> >> source streaming systems, where they were >>> > >> relying on >>> > >> > > > some >>> > >> > > > > > > > > loopback >>> > >> > > > > > > > > > > >> >> information. >>> > >> > > > > > > > > > > >> >> Now that the proposal also relies on >>> loopback >>> > >> (yield >>> > >> > > in >>> > >> > > > > > this >>> > >> > > > > > > > > > case), it is >>> > >> > > > > > > > > > > >> >> not clear how well the new backpressure >>> > handling >>> > >> > > > proposed >>> > >> > > > > > in >>> > >> > > > > > > > > > FLIP-425 is >>> > >> > > > > > > > > > > >> >> robust and handle fluctuating workloads. >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar >>> arguments >>> > >> apply >>> > >> > > for >>> > >> > > > > > > > watermark >>> > >> > > > > > > > > > and timer >>> > >> > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks >>> > showing >>> > >> the >>> > >> > > > > > overhead >>> > >> > > > > > > > > > > >> >> of epoch management with different >>> parameters >>> > >> (e.g., >>> > >> > > > > window >>> > >> > > > > > > > size, >>> > >> > > > > > > > > > watermark >>> > >> > > > > > > > > > > >> >> strategy, etc) >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> - DFS consistency guarantees. The >>> proposal in >>> > >> > > FLIP-427 >>> > >> > > > is >>> > >> > > > > > > > > > DFS-agnostic. >>> > >> > > > > > > > > > > >> >> However, different cloud providers have >>> > >> different >>> > >> > > > storage >>> > >> > > > > > > > > > consistency >>> > >> > > > > > > > > > > >> >> models. >>> > >> > > > > > > > > > > >> >> How do we want to deal with them? >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> Regards, >>> > >> > > > > > > > > > > >> >> Jeyhun >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly >>> Lan < >>> > >> > > > > > > > > zakelly....@gmail.com> >>> > >> > > > > > > > > > wrote: >>> > >> > > > > > > > > > > >> >> >>> > >> > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts! >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > I guess it depends how we would like to >>> > treat >>> > >> the >>> > >> > > > local >>> > >> > > > > > > > disks. >>> > >> > > > > > > > > > I've always >>> > >> > > > > > > > > > > >> >> > > thought about them that almost always >>> > >> eventually >>> > >> > > > all >>> > >> > > > > > > state >>> > >> > > > > > > > > > from the DFS >>> > >> > > > > > > > > > > >> >> > > should end up cached in the local >>> disks. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat >>> local >>> > >> disk as >>> > >> > > > an >>> > >> > > > > > > > optional >>> > >> > > > > > > > > > cache, so >>> > >> > > > > > > > > > > >> >> > the basic design will handle the case >>> with >>> > >> state >>> > >> > > > > residing >>> > >> > > > > > > in >>> > >> > > > > > > > > DFS >>> > >> > > > > > > > > > only. It >>> > >> > > > > > > > > > > >> >> > is a more 'cloud-native' approach that >>> does >>> > >> not >>> > >> > > rely >>> > >> > > > on >>> > >> > > > > > any >>> > >> > > > > > > > > > local storage >>> > >> > > > > > > > > > > >> >> > assumptions, which allow users to >>> > dynamically >>> > >> > > adjust >>> > >> > > > > the >>> > >> > > > > > > > > > capacity or I/O >>> > >> > > > > > > > > > > >> >> > bound of remote storage to gain >>> performance >>> > >> or save >>> > >> > > > the >>> > >> > > > > > > cost, >>> > >> > > > > > > > > > even without >>> > >> > > > > > > > > > > >> >> > a job restart. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > In >>> > >> > > > > > > > > > > >> >> > > the currently proposed more fine >>> grained >>> > >> > > solution, >>> > >> > > > > you >>> > >> > > > > > > > make a >>> > >> > > > > > > > > > single >>> > >> > > > > > > > > > > >> >> > > request to DFS per each state access. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we >>> buffer >>> > the >>> > >> > > state >>> > >> > > > > > > requests >>> > >> > > > > > > > > > and process >>> > >> > > > > > > > > > > >> >> > them in batch, multiple requests will >>> > >> correspond to >>> > >> > > > one >>> > >> > > > > > DFS >>> > >> > > > > > > > > > access (One >>> > >> > > > > > > > > > > >> >> > block access for multiple keys >>> performed by >>> > >> > > RocksDB). >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you >>> > >> requesting >>> > >> > > > the >>> > >> > > > > > > state >>> > >> > > > > > > > > > > >> >> > > asynchronously from local disks into >>> > >> memory? If >>> > >> > > the >>> > >> > > > > > > benefit >>> > >> > > > > > > > > > comes from >>> > >> > > > > > > > > > > >> >> > > parallel I/O, then I would expect the >>> > >> benefit to >>> > >> > > > > > > > > > disappear/shrink when >>> > >> > > > > > > > > > > >> >> > > running multiple subtasks on the same >>> > >> machine, as >>> > >> > > > > they >>> > >> > > > > > > > would >>> > >> > > > > > > > > > be making >>> > >> > > > > > > > > > > >> >> > > their own parallel requests, right? >>> Also >>> > >> enabling >>> > >> > > > > > > > > > checkpointing would >>> > >> > > > > > > > > > > >> >> > > further cut into the available I/O >>> budget. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > That's an interesting topic. Our >>> proposal is >>> > >> > > > > specifically >>> > >> > > > > > > > aimed >>> > >> > > > > > > > > > at the >>> > >> > > > > > > > > > > >> >> > scenario where the machine I/O is not >>> fully >>> > >> loaded >>> > >> > > > but >>> > >> > > > > > the >>> > >> > > > > > > > I/O >>> > >> > > > > > > > > > latency has >>> > >> > > > > > > > > > > >> >> > indeed become a bottleneck for each >>> subtask. >>> > >> While >>> > >> > > > the >>> > >> > > > > > > > > > distributed file >>> > >> > > > > > > > > > > >> >> > system is a prime example of a scenario >>> > >> > > characterized >>> > >> > > > > by >>> > >> > > > > > > > > > abundant and >>> > >> > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled >>> with >>> > >> higher >>> > >> > > I/O >>> > >> > > > > > > > latency. >>> > >> > > > > > > > > > You may >>> > >> > > > > > > > > > > >> >> > expect to increase the parallelism of a >>> job >>> > to >>> > >> > > > enhance >>> > >> > > > > > the >>> > >> > > > > > > > > > performance as >>> > >> > > > > > > > > > > >> >> > well, but that also brings in more >>> waste of >>> > >> CPU's >>> > >> > > and >>> > >> > > > > > > memory >>> > >> > > > > > > > > for >>> > >> > > > > > > > > > building >>> > >> > > > > > > > > > > >> >> > up more subtasks. This is one drawback >>> for >>> > the >>> > >> > > > > > > > > > computation-storage tightly >>> > >> > > > > > > > > > > >> >> > coupled nodes. While in our proposal, >>> the >>> > >> parallel >>> > >> > > > I/O >>> > >> > > > > > with >>> > >> > > > > > > > all >>> > >> > > > > > > > > > the >>> > >> > > > > > > > > > > >> >> > callbacks still running in one task, >>> > >> pre-allocated >>> > >> > > > > > > > > computational >>> > >> > > > > > > > > > resources >>> > >> > > > > > > > > > > >> >> > are better utilized. It is a much more >>> > >> lightweight >>> > >> > > > way >>> > >> > > > > to >>> > >> > > > > > > > > > perform parallel >>> > >> > > > > > > > > > > >> >> > I/O. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > Just with what granularity those async >>> > >> requests >>> > >> > > > should >>> > >> > > > > be >>> > >> > > > > > > > made. >>> > >> > > > > > > > > > > >> >> > > Making state access asynchronous is >>> > >> definitely >>> > >> > > the >>> > >> > > > > > right >>> > >> > > > > > > > way >>> > >> > > > > > > > > > to go! >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > I think the current proposal is based on >>> > such >>> > >> core >>> > >> > > > > ideas: >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > - A pure cloud-native disaggregated >>> > state. >>> > >> > > > > > > > > > > >> >> > - Fully utilize the given resources >>> and >>> > >> try not >>> > >> > > to >>> > >> > > > > > waste >>> > >> > > > > > > > > them >>> > >> > > > > > > > > > (including >>> > >> > > > > > > > > > > >> >> > I/O). >>> > >> > > > > > > > > > > >> >> > - The ability to scale isolated >>> resources >>> > >> (I/O >>> > >> > > or >>> > >> > > > > CPU >>> > >> > > > > > or >>> > >> > > > > > > > > > memory) >>> > >> > > > > > > > > > > >> >> > independently. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > We think a fine-grained granularity is >>> more >>> > >> inline >>> > >> > > > with >>> > >> > > > > > > those >>> > >> > > > > > > > > > ideas, >>> > >> > > > > > > > > > > >> >> > especially without local disk >>> assumptions >>> > and >>> > >> > > without >>> > >> > > > > any >>> > >> > > > > > > > waste >>> > >> > > > > > > > > > of I/O by >>> > >> > > > > > > > > > > >> >> > prefetching. Please note that it is not >>> a >>> > >> > > replacement >>> > >> > > > > of >>> > >> > > > > > > the >>> > >> > > > > > > > > > original local >>> > >> > > > > > > > > > > >> >> > state with synchronous execution. >>> Instead >>> > >> this is a >>> > >> > > > > > > solution >>> > >> > > > > > > > > > embracing the >>> > >> > > > > > > > > > > >> >> > cloud-native era, providing much more >>> > >> scalability >>> > >> > > and >>> > >> > > > > > > > resource >>> > >> > > > > > > > > > efficiency >>> > >> > > > > > > > > > > >> >> > when handling a *huge state*. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > What also worries me a lot in this fine >>> > >> grained >>> > >> > > model >>> > >> > > > > is >>> > >> > > > > > > the >>> > >> > > > > > > > > > effect on the >>> > >> > > > > > > > > > > >> >> > > checkpointing times. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > Your concerns are very reasonable. >>> Faster >>> > >> > > > checkpointing >>> > >> > > > > > is >>> > >> > > > > > > > > > always a core >>> > >> > > > > > > > > > > >> >> > advantage of disaggregated state, but >>> only >>> > >> for the >>> > >> > > > > async >>> > >> > > > > > > > phase. >>> > >> > > > > > > > > > There will >>> > >> > > > > > > > > > > >> >> > be some complexity introduced by >>> in-flight >>> > >> > > requests, >>> > >> > > > > but >>> > >> > > > > > > I'd >>> > >> > > > > > > > > > suggest a >>> > >> > > > > > > > > > > >> >> > checkpoint containing those in-flight >>> state >>> > >> > > requests >>> > >> > > > as >>> > >> > > > > > > part >>> > >> > > > > > > > of >>> > >> > > > > > > > > > the state, >>> > >> > > > > > > > > > > >> >> > to accelerate the sync phase by >>> skipping the >>> > >> buffer >>> > >> > > > > > > draining. >>> > >> > > > > > > > > > This makes >>> > >> > > > > > > > > > > >> >> > the buffer size have little impact on >>> > >> checkpoint >>> > >> > > > time. >>> > >> > > > > > And >>> > >> > > > > > > > all >>> > >> > > > > > > > > > the changes >>> > >> > > > > > > > > > > >> >> > keep within the execution model we >>> proposed >>> > >> while >>> > >> > > the >>> > >> > > > > > > > > checkpoint >>> > >> > > > > > > > > > barrier >>> > >> > > > > > > > > > > >> >> > alignment or handling will not be >>> touched in >>> > >> our >>> > >> > > > > > proposal, >>> > >> > > > > > > > so I >>> > >> > > > > > > > > > guess >>> > >> > > > > > > > > > > >> >> > the complexity is relatively >>> controllable. I >>> > >> have >>> > >> > > > faith >>> > >> > > > > > in >>> > >> > > > > > > > that >>> > >> > > > > > > > > > :) >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > Also regarding the overheads, it would >>> be >>> > >> great if >>> > >> > > > you >>> > >> > > > > > > could >>> > >> > > > > > > > > > provide >>> > >> > > > > > > > > > > >> >> > > profiling results of the benchmarks >>> that >>> > you >>> > >> > > > > conducted >>> > >> > > > > > to >>> > >> > > > > > > > > > verify the >>> > >> > > > > > > > > > > >> >> > > results. Or maybe if you could >>> describe >>> > the >>> > >> steps >>> > >> > > > to >>> > >> > > > > > > > > reproduce >>> > >> > > > > > > > > > the >>> > >> > > > > > > > > > > >> >> > results? >>> > >> > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs >>> "Hashmap >>> > with >>> > >> > > async >>> > >> > > > > > API". >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > Yes we could profile the benchmarks. >>> And for >>> > >> the >>> > >> > > > > > comparison >>> > >> > > > > > > > of >>> > >> > > > > > > > > > "Hashmap >>> > >> > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we >>> > >> conduct a >>> > >> > > > > > Wordcount >>> > >> > > > > > > > job >>> > >> > > > > > > > > > written >>> > >> > > > > > > > > > > >> >> > with async APIs but disabling the async >>> > >> execution >>> > >> > > by >>> > >> > > > > > > directly >>> > >> > > > > > > > > > completing >>> > >> > > > > > > > > > > >> >> > the future using sync state access. This >>> > >> evaluates >>> > >> > > > the >>> > >> > > > > > > > overhead >>> > >> > > > > > > > > > of newly >>> > >> > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync >>> > >> execution >>> > >> > > (even >>> > >> > > > > > > though >>> > >> > > > > > > > > > they are not >>> > >> > > > > > > > > > > >> >> > designed for it). The code will be >>> provided >>> > >> later. >>> > >> > > > For >>> > >> > > > > > > other >>> > >> > > > > > > > > > results of our >>> > >> > > > > > > > > > > >> >> > PoC[1], you can follow the instructions >>> > >> here[2] to >>> > >> > > > > > > reproduce. >>> > >> > > > > > > > > > Since the >>> > >> > > > > > > > > > > >> >> > compilation may take some effort, we >>> will >>> > >> directly >>> > >> > > > > > provide >>> > >> > > > > > > > the >>> > >> > > > > > > > > > jar for >>> > >> > > > > > > > > > > >> >> > testing next week. >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your >>> mail >>> > >> but it >>> > >> > > > is a >>> > >> > > > > > bit >>> > >> > > > > > > > > late >>> > >> > > > > > > > > > in my >>> > >> > > > > > > > > > > >> >> > local time and the next few days are >>> > >> weekends. So I >>> > >> > > > > will >>> > >> > > > > > > > reply >>> > >> > > > > > > > > > to you >>> > >> > > > > > > > > > > >> >> > later. Thanks for your response! >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > [1] >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults >>> > >> > > > > > > > > > > >> >> > [2] >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> >>> > >>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > Best, >>> > >> > > > > > > > > > > >> >> > Zakelly >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng >>> Zhou >>> > < >>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> >>> > >> > > > > > > > > > > >> >> > wrote: >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > > >> >> > > Hi, >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > Thanks for proposing this design! I >>> just >>> > >> read >>> > >> > > > > FLIP-424 >>> > >> > > > > > > and >>> > >> > > > > > > > > > FLIP-425 >>> > >> > > > > > > > > > > >> >> > > and have some questions about the >>> proposed >>> > >> > > changes. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > For Async API (FLIP-424) >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 1. Why do we need a close() method on >>> > >> > > > StateIterator? >>> > >> > > > > > This >>> > >> > > > > > > > > > method seems >>> > >> > > > > > > > > > > >> >> > > unused in the usage example codes. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s >>> JavaDoc, >>> > >> it is >>> > >> > > > > stated >>> > >> > > > > > > that >>> > >> > > > > > > > > > "No null >>> > >> > > > > > > > > > > >> >> > > entries are allowed". It might be >>> better >>> > to >>> > >> > > further >>> > >> > > > > > > explain >>> > >> > > > > > > > > > what will >>> > >> > > > > > > > > > > >> >> > > happen if a null value is passed, >>> ignoring >>> > >> the >>> > >> > > > value >>> > >> > > > > in >>> > >> > > > > > > the >>> > >> > > > > > > > > > returned >>> > >> > > > > > > > > > > >> >> > > Collection or throwing exceptions. >>> Given >>> > >> that >>> > >> > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be >>> returned >>> > >> in the >>> > >> > > > > > example >>> > >> > > > > > > > > code, >>> > >> > > > > > > > > > I >>> > >> > > > > > > > > > > >> >> > > suppose the former one might be >>> correct. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > For Async Execution (FLIP-425) >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, >>> if a >>> > >> recordB >>> > >> > > > has >>> > >> > > > > > its >>> > >> > > > > > > > key >>> > >> > > > > > > > > > collide >>> > >> > > > > > > > > > > >> >> > > with an ongoing recordA, its >>> > >> processElement() >>> > >> > > > method >>> > >> > > > > > can >>> > >> > > > > > > > > still >>> > >> > > > > > > > > > be >>> > >> > > > > > > > > > > >> >> > > triggered immediately, and then it >>> might >>> > be >>> > >> moved >>> > >> > > > to >>> > >> > > > > > the >>> > >> > > > > > > > > > blocking >>> > >> > > > > > > > > > > >> >> > > buffer in AEC if it involves state >>> > >> operations. >>> > >> > > This >>> > >> > > > > > means >>> > >> > > > > > > > > that >>> > >> > > > > > > > > > > >> >> > > recordB's output will precede >>> recordA's >>> > >> output in >>> > >> > > > > > > > downstream >>> > >> > > > > > > > > > > >> >> > > operators, if recordA involves state >>> > >> operations >>> > >> > > > while >>> > >> > > > > > > > recordB >>> > >> > > > > > > > > > does >>> > >> > > > > > > > > > > >> >> > > not. This will harm the correctness of >>> > >> Flink jobs >>> > >> > > > in >>> > >> > > > > > some >>> > >> > > > > > > > use >>> > >> > > > > > > > > > cases. >>> > >> > > > > > > > > > > >> >> > > For example, in dim table join cases, >>> > >> recordA >>> > >> > > could >>> > >> > > > > be >>> > >> > > > > > a >>> > >> > > > > > > > > delete >>> > >> > > > > > > > > > > >> >> > > operation that involves state access, >>> > while >>> > >> > > recordB >>> > >> > > > > > could >>> > >> > > > > > > > be >>> > >> > > > > > > > > > an insert >>> > >> > > > > > > > > > > >> >> > > operation that needs to visit external >>> > >> storage >>> > >> > > > > without >>> > >> > > > > > > > state >>> > >> > > > > > > > > > access. >>> > >> > > > > > > > > > > >> >> > > If recordB's output precedes >>> recordA's, >>> > >> then an >>> > >> > > > entry >>> > >> > > > > > > that >>> > >> > > > > > > > is >>> > >> > > > > > > > > > supposed >>> > >> > > > > > > > > > > >> >> > > to finally exist with recordB's value >>> in >>> > >> the sink >>> > >> > > > > table >>> > >> > > > > > > > might >>> > >> > > > > > > > > > actually >>> > >> > > > > > > > > > > >> >> > > be deleted according to recordA's >>> command. >>> > >> This >>> > >> > > > > > situation >>> > >> > > > > > > > > > should be >>> > >> > > > > > > > > > > >> >> > > avoided and the order of same-key >>> records >>> > >> should >>> > >> > > be >>> > >> > > > > > > > strictly >>> > >> > > > > > > > > > > >> >> > > preserved. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests >>> > >> submitted by >>> > >> > > > > > > Callbacks >>> > >> > > > > > > > > > will not >>> > >> > > > > > > > > > > >> >> > > invoke further yield() methods. Given >>> that >>> > >> > > yield() >>> > >> > > > is >>> > >> > > > > > > used >>> > >> > > > > > > > > > when there >>> > >> > > > > > > > > > > >> >> > > is "too much" in-flight data, does it >>> mean >>> > >> > > > > > StateRequests >>> > >> > > > > > > > > > submitted by >>> > >> > > > > > > > > > > >> >> > > Callbacks will never be "too much"? >>> What >>> > if >>> > >> the >>> > >> > > > total >>> > >> > > > > > > > number >>> > >> > > > > > > > > of >>> > >> > > > > > > > > > > >> >> > > StateRequests exceed the capacity of >>> Flink >>> > >> > > > operator's >>> > >> > > > > > > > memory >>> > >> > > > > > > > > > space? >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this >>> FLIP >>> > >> provided >>> > >> > > > an >>> > >> > > > > > > > > > out-of-order >>> > >> > > > > > > > > > > >> >> > > execution mode apart from the default >>> > >> > > > > strictly-ordered >>> > >> > > > > > > > mode, >>> > >> > > > > > > > > > which can >>> > >> > > > > > > > > > > >> >> > > optimize performance by allowing more >>> > >> concurrent >>> > >> > > > > > > > executions. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 3.1 I'm concerned that the >>> out-of-order >>> > >> execution >>> > >> > > > > mode, >>> > >> > > > > > > > along >>> > >> > > > > > > > > > with the >>> > >> > > > > > > > > > > >> >> > > epoch mechanism, would bring more >>> > >> complexity to >>> > >> > > the >>> > >> > > > > > > > execution >>> > >> > > > > > > > > > model >>> > >> > > > > > > > > > > >> >> > > than the performance improvement it >>> > >> promises. >>> > >> > > Could >>> > >> > > > > we >>> > >> > > > > > > add >>> > >> > > > > > > > > some >>> > >> > > > > > > > > > > >> >> > > benchmark results proving the benefit >>> of >>> > >> this >>> > >> > > mode? >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a >>> public >>> > API >>> > >> > > section >>> > >> > > > > > > > > describing >>> > >> > > > > > > > > > how >>> > >> > > > > > > > > > > >> >> > > users or developers can switch between >>> > >> these two >>> > >> > > > > > > execution >>> > >> > > > > > > > > > modes. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and >>> > checkpoint >>> > >> > > > mentioned >>> > >> > > > > > in >>> > >> > > > > > > > this >>> > >> > > > > > > > > > FLIP, >>> > >> > > > > > > > > > > >> >> > > there are also more other events that >>> > might >>> > >> > > appear >>> > >> > > > in >>> > >> > > > > > the >>> > >> > > > > > > > > > stream of >>> > >> > > > > > > > > > > >> >> > > data records. It might be better to >>> > >> generalize >>> > >> > > the >>> > >> > > > > > > > execution >>> > >> > > > > > > > > > mode >>> > >> > > > > > > > > > > >> >> > > mechanism to handle all possible >>> events. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 4. It might be better to treat >>> > >> callback-handling >>> > >> > > > as a >>> > >> > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of >>> Mails, to >>> > >> avoid >>> > >> > > > the >>> > >> > > > > > > > overhead >>> > >> > > > > > > > > > of >>> > >> > > > > > > > > > > >> >> > > repeatedly creating Mail objects. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current >>> > >> default >>> > >> > > > values >>> > >> > > > > > for >>> > >> > > > > > > > > > things like >>> > >> > > > > > > > > > > >> >> > > active buffer size thresholds and >>> > timeouts? >>> > >> These >>> > >> > > > > could >>> > >> > > > > > > > help >>> > >> > > > > > > > > > with >>> > >> > > > > > > > > > > >> >> > > memory consumption and latency >>> analysis. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 6. Why do we need to record the >>> hashcode >>> > of >>> > >> a >>> > >> > > > record >>> > >> > > > > in >>> > >> > > > > > > its >>> > >> > > > > > > > > > > >> >> > > RecordContext? It seems not used. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM >>> > heap >>> > >> or >>> > >> > > > > > RocksDB", >>> > >> > > > > > > > the >>> > >> > > > > > > > > > link >>> > >> > > > > > > > > > > >> >> > > points to a document in flink-1.15. It >>> > >> might be >>> > >> > > > > better >>> > >> > > > > > to >>> > >> > > > > > > > > > verify the >>> > >> > > > > > > > > > > >> >> > > referenced content is still valid in >>> the >>> > >> latest >>> > >> > > > Flink >>> > >> > > > > > and >>> > >> > > > > > > > > > update the >>> > >> > > > > > > > > > > >> >> > > link accordingly. Same for other >>> > references >>> > >> if >>> > >> > > any. >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > Best, >>> > >> > > > > > > > > > > >> >> > > Yunfeng Zhou >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan >>> Mei < >>> > >> > > > > > > > > > yuanmei.w...@gmail.com> wrote: >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > Hi Devs, >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, >>> > Zakelly >>> > >> Lan, >>> > >> > > > > > Jinzhong >>> > >> > > > > > > > Li, >>> > >> > > > > > > > > > Hangxiang >>> > >> > > > > > > > > > > >> >> > Yu, >>> > >> > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like >>> to >>> > >> start a >>> > >> > > > > > > discussion >>> > >> > > > > > > > > > about >>> > >> > > > > > > > > > > >> >> > > introducing >>> > >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and >>> > >> Management in >>> > >> > > > Flink >>> > >> > > > > > > 2.0. >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > The past decade has witnessed a >>> dramatic >>> > >> shift >>> > >> > > in >>> > >> > > > > > > Flink's >>> > >> > > > > > > > > > deployment >>> > >> > > > > > > > > > > >> >> > > mode, >>> > >> > > > > > > > > > > >> >> > > > workload patterns, and hardware >>> > >> improvements. >>> > >> > > > We've >>> > >> > > > > > > moved >>> > >> > > > > > > > > > from the >>> > >> > > > > > > > > > > >> >> > > > map-reduce era where workers are >>> > >> > > > > computation-storage >>> > >> > > > > > > > > tightly >>> > >> > > > > > > > > > coupled >>> > >> > > > > > > > > > > >> >> > > nodes >>> > >> > > > > > > > > > > >> >> > > > to a cloud-native world where >>> > >> containerized >>> > >> > > > > > deployments >>> > >> > > > > > > > on >>> > >> > > > > > > > > > Kubernetes >>> > >> > > > > > > > > > > >> >> > > > become standard. To enable Flink's >>> > >> Cloud-Native >>> > >> > > > > > future, >>> > >> > > > > > > > we >>> > >> > > > > > > > > > introduce >>> > >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and >>> > >> Management that >>> > >> > > > > uses >>> > >> > > > > > > DFS >>> > >> > > > > > > > as >>> > >> > > > > > > > > > primary >>> > >> > > > > > > > > > > >> >> > > storage >>> > >> > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the >>> Flink >>> > 2.0 >>> > >> > > > Roadmap. >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > Design Details can be found in >>> > >> FLIP-423[1]. >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > This new architecture is aimed to >>> solve >>> > >> the >>> > >> > > > > following >>> > >> > > > > > > > > > challenges >>> > >> > > > > > > > > > > >> >> > brought >>> > >> > > > > > > > > > > >> >> > > in >>> > >> > > > > > > > > > > >> >> > > > the cloud-native era for Flink. >>> > >> > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in >>> > >> containerization >>> > >> > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by >>> > >> compaction in >>> > >> > > > the >>> > >> > > > > > > > current >>> > >> > > > > > > > > > state model >>> > >> > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with >>> large >>> > >> states >>> > >> > > > > > (hundreds >>> > >> > > > > > > of >>> > >> > > > > > > > > > Terabytes) >>> > >> > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a >>> native >>> > >> way >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > More specifically, we want to reach >>> a >>> > >> consensus >>> > >> > > > on >>> > >> > > > > > the >>> > >> > > > > > > > > > following issues >>> > >> > > > > > > > > > > >> >> > > in >>> > >> > > > > > > > > > > >> >> > > > this discussion: >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > 1. Overall design >>> > >> > > > > > > > > > > >> >> > > > 2. Proposed Changes >>> > >> > > > > > > > > > > >> >> > > > 3. Design details to achieve >>> Milestone1 >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > In M1, we aim to achieve an >>> end-to-end >>> > >> baseline >>> > >> > > > > > version >>> > >> > > > > > > > > > using DFS as >>> > >> > > > > > > > > > > >> >> > > > primary storage and complete core >>> > >> > > > functionalities, >>> > >> > > > > > > > > including: >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > - Asynchronous State APIs >>> (FLIP-424)[2]: >>> > >> > > > Introduce >>> > >> > > > > > new >>> > >> > > > > > > > APIs >>> > >> > > > > > > > > > for >>> > >> > > > > > > > > > > >> >> > > > asynchronous state access. >>> > >> > > > > > > > > > > >> >> > > > - Asynchronous Execution Model >>> > >> (FLIP-425)[3]: >>> > >> > > > > > > Implement a >>> > >> > > > > > > > > > non-blocking >>> > >> > > > > > > > > > > >> >> > > > execution model leveraging the >>> > >> asynchronous >>> > >> > > APIs >>> > >> > > > > > > > introduced >>> > >> > > > > > > > > > in >>> > >> > > > > > > > > > > >> >> > FLIP-424. >>> > >> > > > > > > > > > > >> >> > > > - Grouping Remote State Access >>> > >> (FLIP-426)[4]: >>> > >> > > > > Enable >>> > >> > > > > > > > > > retrieval of >>> > >> > > > > > > > > > > >> >> > remote >>> > >> > > > > > > > > > > >> >> > > > state data in batches to avoid >>> > unnecessary >>> > >> > > > > round-trip >>> > >> > > > > > > > costs >>> > >> > > > > > > > > > for remote >>> > >> > > > > > > > > > > >> >> > > > access >>> > >> > > > > > > > > > > >> >> > > > - Disaggregated State Store >>> > (FLIP-427)[5]: >>> > >> > > > > Introduce >>> > >> > > > > > > the >>> > >> > > > > > > > > > initial >>> > >> > > > > > > > > > > >> >> > version >>> > >> > > > > > > > > > > >> >> > > of >>> > >> > > > > > > > > > > >> >> > > > the ForSt disaggregated state store. >>> > >> > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale >>> Integration >>> > >> > > > > (FLIP-428)[6]: >>> > >> > > > > > > > > > Integrate >>> > >> > > > > > > > > > > >> >> > > > checkpointing mechanisms with the >>> > >> disaggregated >>> > >> > > > > state >>> > >> > > > > > > > store >>> > >> > > > > > > > > > for fault >>> > >> > > > > > > > > > > >> >> > > > tolerance and fast rescaling. >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > We will vote on each FLIP in >>> separate >>> > >> threads >>> > >> > > to >>> > >> > > > > make >>> > >> > > > > > > > sure >>> > >> > > > > > > > > > each FLIP >>> > >> > > > > > > > > > > >> >> > > > reaches a consensus. But we want to >>> keep >>> > >> the >>> > >> > > > > > discussion >>> > >> > > > > > > > > > within a >>> > >> > > > > > > > > > > >> >> > focused >>> > >> > > > > > > > > > > >> >> > > > thread (this thread) for easier >>> tracking >>> > >> of >>> > >> > > > > contexts >>> > >> > > > > > to >>> > >> > > > > > > > > avoid >>> > >> > > > > > > > > > > >> >> > duplicated >>> > >> > > > > > > > > > > >> >> > > > questions/discussions and also to >>> think >>> > >> of the >>> > >> > > > > > > > > > problem/solution in a >>> > >> > > > > > > > > > > >> >> > full >>> > >> > > > > > > > > > > >> >> > > > picture. >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > Looking forward to your feedback >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > Best, >>> > >> > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, >>> > >> Yanfei and >>> > >> > > > Feng >>> > >> > > > > > > > > > > >> >> > > > >>> > >> > > > > > > > > > > >> >> > > > [1] >>> > >> > > https://cwiki.apache.org/confluence/x/R4p3EQ >>> > >> > > > > > > > > > > >> >> > > > [2] >>> > >> > > https://cwiki.apache.org/confluence/x/SYp3EQ >>> > >> > > > > > > > > > > >> >> > > > [3] >>> > >> > > https://cwiki.apache.org/confluence/x/S4p3EQ >>> > >> > > > > > > > > > > >> >> > > > [4] >>> > >> > > https://cwiki.apache.org/confluence/x/TYp3EQ >>> > >> > > > > > > > > > > >> >> > > > [5] >>> > >> > > https://cwiki.apache.org/confluence/x/T4p3EQ >>> > >> > > > > > > > > > > >> >> > > > [6] >>> > >> > > https://cwiki.apache.org/confluence/x/UYp3EQ >>> > >> > > > > > > > > > > >> >> > > >>> > >> > > > > > > > > > > >> >> > >>> > >> > > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > >> > > > > >>> > >> > > > >>> > >> > > >>> > >> >>> > > >>> > >>> >>