Hi Xintong, Thank you for all the comments. Please see my reply inline.
On Mon, Sep 18, 2023 at 11:31 AM Xintong Song <tonysong...@gmail.com> wrote: > Thanks for addressing my comments, Dong. > > The expected behavior of checkpointing and failover depends on whether > > there is any operator currently running in the job with all its inputs' > > isBacklog=true. If there exists such an operator and > > interval-during-backlog = 0, then checkpoint will be disabled and the > > operator will have to failover in a way similar to batch mode. > > > This makes sense to me. Shall we also put this into the FLIP. Or maybe you > already did that and I overlooked it? The current description in "4) > Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to me. > It says "At the point when isBacklog switches to false, source operator > ...", which sounds like upon any source operator switching to isBacklog = > false. > I think it is kind of mentioned in the doc of execution.checkpointing.interval-during-backlog, which says "if it is not null and any source reports isProcessingBacklog=true, it is the interval...". Based on this doc, we can derive that if there is one operator reporting isBacklog=true, then the checkpointing interval is determined by interval-during-backlog, which in this case has value 0 indicating that checkpoint triggering is disabled. Given that other readers might also have this question, I have updated the FLIP-327 with the following statement to make it more explicit: "For jobs with multiple sources and execution.checkpointing.interval-during-backlog = 0, checkpoint triggering is enabled if and only if all sources have isBacklog=false". > I am not sure what is the concern with having `flink-streaming-java` depend > > on `flink-runtime`. Can you clarify the exact concern? > > > > The concern here is that an API module should not depend on a runtime > module. Currently, we have the "user codes -> flink-streaming-java -> > flink-runtime" dependency chain, which makes binary compatibility > impossible because any runtime changes can break the compatibility with a > user jar (which bundles flink-streaming-java) compiled for an older > version. Ideally, we want the runtime module to depend on the API module, > rather than the other way around. This is one of the issues we are trying > to resolve with the programmatic API refactor. However, the way we are > trying to resolve it is to introduce another API module and gradually > replace the current DataStream API / flink-streaming-java, which means > flink-streaming-java will stay depending on flink-runtime for a while > anyway. So the concern here is minor, only about we might need more effort > when reworking this with the new API. > Thanks for the detailed explanation. Given that we plan to avoid having flink-streaming-java depend on flink-runtime, I agree it is preferred to avoid introducing more dependencies like this. I have updated the FLIP to let RecordAttributes extend StreamElement. Best, Dong > The rest of your replies make sense to me. > > Best, > > Xintong > > > > On Fri, Sep 15, 2023 at 10:05 PM Dong Lin <lindon...@gmail.com> wrote: > > > Hi Xintong, > > > > Thanks for your comments! Please see my reply inline. > > > > On Thu, Sep 14, 2023 at 4:58 PM Xintong Song <tonysong...@gmail.com> > > wrote: > > > > > Sorry to join the discussion late. > > > > > > Overall, I think it's a good idea to support dynamically switching the > > > operator algorithms between Streaming (optimized towards low latency + > > > checkpointing supports) and Batch (optimized towards throughput). This > is > > > indeed a big and complex topic, and I really appreciate the previous > > > discussions that narrow the scope of this FLIP down to only considering > > > switching from Batch to Streaming as a first step. > > > > > > I have several questions. > > > > > > 1. The FLIP discusses various behaviors under 4 scenarios: streaming > > mode, > > > batch mode, mixed mode with checkpoint interval > 0, mixed mode with > > > checkpoint interval = 0. IIUC, this is because many batch optimizations > > > cannot be supported together with checkpointing. This justifies that in > > > mixed mode with interval > 0, most behaviors are the same as in > streaming > > > mode. However, mixed mode with checkpoint interval = 0 does not always > > > necessarily mean we should apply such optimization. It is possible that > > in > > > some cases (likely with small data amounts) the cost of such > > optimizations > > > are higher than the benefit. Therefore, I'd suggest decoupling the > > concept > > > of applying these optimizations (i.e., the batch execution phase in the > > > mixed mode) from whether checkpointing is enabled or not. In > particular, > > > I'd suggest removing the scenario "mixed mode with > > > e.c.interval-during-backlog > 0", changing the scenario "mixed mode > with > > > e.c.interval-during-backlog = 0" to simply "mixed mode", and say that > can > > > have different strategies for deciding whether to enable the mixed mode > > and > > > as the first step the strategy is to enable it when > > > e.c.interval-during-backlog = 0. > > > > > > > Thanks for the detailed explanation! > > > > I have updated the "Behavior changes when switching from batch mode to > > stream mode" section with the following changes. > > > > 1) Remove the description of "mixed mode with interval-during-backlog > > 0" > > and add the statement saying that "after this FLIP, the behavior of Flink > > runtime with execution.runtime-mode = streaming AND > > execution.checkpointing.interval-during-backlog > 0, will be same as the > > stream mode prior to this FLIP" > > > > 2) Add the statement saying that "Mixed mode refers to the behavior of > > Flink runtime after this FLIP with execution.runtime-mode = streaming AND > > execution.checkpointing.interval-during-backlog = 0". > > > > 3) Add the statement saying that "It is possible for mixed mode to be > > slower than stream mode, particularly when there is only small amount of > > input records and the overhead of buffering/sorting inputs out-weight its > > benefit. This is similar to how the merge join might be slower than hash > > join. This FLIP focuses on optimizing the Flink throughput when there is > a > > high number of input records. In the future, we might introduce more > > strategies to turn on mix mode in a smart way to avoid performance > > regression". > > > > Would this address your concern? > > > > > > > > > 2. According to the FLIP, before isBacklog = false, the timer service > > only > > > keeps timers for the current key. It also says upon the end of each > key, > > it > > > fires timers of the key up to the last watermark. IIUC, that means not > > all > > > timers are guaranteed to be fired. It is possible that some timers are > > left > > > to be triggered after isBacklog switching to false. If the timer > service > > > only keeps timers for the current key, those not-fired timers may get > > lost > > > when switching to a new key. > > > > > > > Thanks for catching this. You are right that all timers should be fired > as > > long as the corresponding firing condition (either processing-time or > > event-time) is satisfied. > > > > I have updated the "Timer Service" part of the "Behavior changes when > > switching from batch mode to stream mode" section accordingly. Can you > see > > if it addresses your concern? > > > > > > > > > > 3. Is it possible that some sources / operators in the job switch to > > > isBacklog = false, while others are still isBacklog = true? In that > case, > > > what is the expected behavior for checkpointing and failover? > > > > > > > Yes, it is possible. And in this case, Flink runtime will handle this > > operator as if all the operator's inputs have isBacklog=false. In > > particular, Flink runtime will not automatically sort inputs of this > > operator. > > > > I added the following statement in the FLIP to clarify the behavior: "For > > an operator with 2+ inputs, where some inputs have isBacklog=true and > some > > other inputs have isBacklog=false, Flink runtime will handle this > operator > > as if all its inputs have isBacklog=false". > > > > The expected behavior of checkpointing and failover depends on whether > > there is any operator currently running in the job with all its inputs' > > isBacklog=true. If there exists such an operator > > and interval-during-backlog = 0, then checkpoint will be disabled and the > > operator will have to failover in a way similar to batch mode. > > > > > > > > > > > > 4. Do we require RecordAttributes to be properly handled by all > > operators? > > > Or do we consider it as hints that operators may benefit from looking > > into > > > it but should not run into any problems ignoring it? I'm asking > because, > > if > > > they are required to be properly handled, we probably need a way to > > enforce > > > operators to deal with it. `processRecordAttributes(RecordAttributes)` > > > might not be a good fit because we don't know whether the operator has > > > looked into all necessary fields of `RecordAttributes`. > > > > > > > As of this FLIP, we would not require RecordAttributes to be handled any > > operator in order to achieve correctness. So it is more like a hint. More > > specifically, the isBacklog attribute provides a hint for an operator to > > optionally delay the processing of its inputs if doing so can improve its > > throughput. > > > > I think it would be useful to avoid requiring operators to explicitly > > handling attributes contained in RecordAttributes. This is because we > want > > the features added in this FLIP (and future FLIPs) to be backward > > compatible without breaking the correctness of existing jobs. > > > > Suppose we really need to add a record attribute that should be > explicitly > > handled by every operator, I believe we can always find a way to enforce > > this requirement (e.g. fail job compilation with proper error) in the > > future. For example, we can add a method such as > handleXXXRecodAttribute() > > in the operator interface without default implementation. > > > > > > > > > > 5. I wonder if there's any strong reasons behind choosing > `RuntimeEvent` > > > over `StreamElement` for `RecordAttributes` to extend? My concern is > > that, > > > the current approach introduces one more dependency from > > > `flink-streaming-java` (operators that uses `RecordAttributes`) to > > > `flink-runtime` (where `RuntimeEvent` comes from), which seems to be > > > unnecessary. > > > > > > > There is no strong reason to choose `RuntimeEvent` over `StreamElement`. > I > > think the main (and minor) reason for doing so is the simplicity of > > implementation. For example, we don't need to add methods such > > as StreamElement#isRecordAttributes and StreamElement#asRecordAttributes. > > > > I am not sure what is the concern with having `flink-streaming-java` > depend > > on `flink-runtime`. Can you clarify the exact concern? > > > > In any case, I don't have a strong preference between `RuntimeEvent` over > > `StreamElement`. We can update the FLIP to use `StreamElement` as long as > > there is a well-defined non-trivial reason for making this choice. > > > > > > > 6. The FLIP says it leverages state backend optimizations introduced in > > > FLIP-325. Just for clarification, does this mean this FLIP is depending > > on > > > FLIP-325, and probably should not be voted / accepted until FLIP-325 is > > > accepted? > > > > > > > Yes, the proposed implementation of this FLIP depends on FLIP-325. We can > > start voting thread for FLIP-327 after FLIP-325 is accepted. Maybe we can > > continue to discuss this FLIP in the mean time (before FLIP-325 is > > accepted). > > > > Thanks again for the very detailed and helpful review! Looking forward to > > your follow-up comments. > > > > Best, > > Dong > > > > > > > Best, > > > > > > Xintong > > > > > > > > > > > > On Fri, Sep 1, 2023 at 12:48 AM Jan Lukavský <je...@seznam.cz> wrote: > > > > > > > Hi, > > > > > > > > some keywords in this triggered my attention, so sorry for late > jumping > > > > in, but I'd like to comprehend the nature of the proposal. > > > > > > > > I'll try to summarize my understanding: > > > > > > > > The goal of the FLIP is to support automatic switching between > > streaming > > > > and batch processing, leveraging the fact that batch processing is > more > > > > computationally effective. This makes perfect sense. > > > > > > > > Looking at the streaming vs. batch semantics, switching from > streaming > > > > to batch means the following: > > > > > > > > a) generally, watermarks are not propagated in batch, watermark > moves > > > > from -inf to +inf in one step, at the end of batch input, this might > > > > (and probably will) skip many invocations of timers > > > > > > > > b) grouping by key (and window) can be done efficiently, because it > > > > can be done by sort-group and ideally parallelized by window (with > some > > > > caveats) > > > > > > > > The switch also has some conditions, namely: > > > > > > > > i) batch mode does not do checkpoints, inputs must be accessible > > > > repeatedly (forever) > > > > > > > > ii) due to failures in batch mode, inputs might be reprocessed and > > > > thus must be immutable or all sub-results computed in all branches of > > > > the computation (even possibly unaffected by the failure) have to be > > > > discarded and recomputed from scratch > > > > > > > > Obviously, in case of the switch from batch to streaming, the > property > > > > a) has to be modified so the watermark does not move to +inf, but to > > > > min(streaming watermark). Giving these properties, it should be > > possible > > > > to exchange batch and streaming processing without any cooperation > with > > > > the application logic itself. Is my understanding correct? > > > > > > > > If so, there is still one open question to efficiency, though. The > > > > streaming operator _might_ need sorting by timestamp (e.g. processing > > > > time-series data, or even sequential data). In that case simply > > > > switching streaming semantics to batch processing does not yield > > > > efficient processing, because the operator still needs to buffer and > > > > manually sort all the input data (batch data is always unordered). On > > > > the other hand, the batch runner already does sorting (for grouping > by > > > > key), so adding additional sorting criterion is very cheap. In Apache > > > > Beam, we introduced a property of a stateful PTransform (DoFn) called > > > > @RequiresTimeSortedInput [1], which can then be implemented > efficiently > > > > by batch engines. > > > > > > > > Does the FLIP somehow work with conditions i) and ii)? I can imagine > > for > > > > instance that if data is read from say Kafka, then if backlog gets > > > > sufficiently large, then even the batch processing can take > substantial > > > > time and if it fails after long processing, some of the original data > > > > might be already rolled out from Kafka topic. > > > > > > > > In the FLIP there are some proposed changes to sources to emit > metadata > > > > about if the records come from backlog. What is the driving line of > > > > thoughts why this is needed? In my point of view, streaming engines > are > > > > _always_ processing backlog, the only question is "how delayed are > the > > > > currently processed events after HEAD", or more specifically in this > > > > case "how many elements can we expect to process if the source would > > > > immediately stop receiving more data?". This should be configurable > > > > using simple option defining the difference between current > > > > processing-time (JM) and watermark of the source, or am I missing > > > > something? > > > > > > > > Thanks for clarification and all the best, > > > > > > > > Jan > > > > > > > > [1] > > > > > > > > > > > > > > https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html > > > > > > > > On 8/31/23 13:17, Xuannan Su wrote: > > > > > Hi all, > > > > > > > > > > I would like to share some updates on FLIP-327. Dong and I have > had a > > > > > series of discussions and have made several refinements to the > FLIP. > > > > > > > > > > The major change to the FLIP is to allow the input of the one-input > > > > > operator to be automatically sorted during backlog processing. When > > > > > combined with the state backend optimization introduced in FLIP-325 > > > [1], > > > > > all the keyed single-input operators can achieve similar > performance > > as > > > > in > > > > > batch mode during backlog processing without any code change to the > > > > > operator. We also implemented a POC[2] and conducted benchmark[3] > > using > > > > the > > > > > KeyedStream#reduce operation. The benchmark results demonstrate the > > > > > performance gains that this FLIP can offer. > > > > > > > > > > I am looking forward to any comments or feedback you may have on > this > > > > FLIP. > > > > > > > > > > Best, > > > > > Xuannan > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access > > > > > [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo > > > > > [3] > > > > > > > > > > > > > > > https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java > > > > > > > > > > > > > > > > > > > >> On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> wrote: > > > > >> > > > > >> Hi Piotr, > > > > >> > > > > >> Thanks for the explanation. > > > > >> > > > > >> To recap our offline discussion, there is a concern regarding the > > > > >> capability to dynamically switch between stream and batch modes. > > This > > > > >> concern is around unforeseen behaviors such as bugs or performance > > > > >> regressions, which we might not yet be aware of yet. The reason > for > > > this > > > > >> concern is that this feature involves a fundamental impact on the > > > Flink > > > > >> runtime's behavior. > > > > >> > > > > >> Due to the above concern, I agree it is reasonable to annotate > > related > > > > > APIs > > > > >> as experimental. This step would provide us with the flexibility > to > > > > modify > > > > >> these APIs if issues arise in the future. This annotation also > > serves > > > > as a > > > > >> note to users that this functionality might not perform well as > > > > expected. > > > > >> > > > > >> Though I believe that we can ensure the reliability of this > feature > > > > > through > > > > >> good design and code reviews, comprehensive unit tests, and > thorough > > > > >> integration testing, I agree that it is reasonable to be extra > > > cautious > > > > in > > > > >> this case. Also, it should be OK to delay making these APIs as > > > > >> non-experimental by 1-2 releases. > > > > >> > > > > >> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in > > these > > > > docs > > > > >> as experimental. Please let me know if you think any other API > > should > > > > also > > > > >> be marked as experimental. > > > > >> > > > > >> Thanks! > > > > >> Dong > > > > >> > > > > >> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski < > > > > piotr.nowoj...@gmail.com> > > > > >> wrote: > > > > >> > > > > >>> Hi Dong, > > > > >>> > > > > >>> Operators API is unfortunately also our public facing API and I > > mean > > > > the > > > > >>> APIs that we will add there should also be marked `@Experimental` > > > IMO. > > > > >>> > > > > >>> The config options should also be marked as experimental (both > > > > >>> annotated @Experimental and noted the same thing in the docs, > > > > >>> if @Experimental annotation is not automatically mentioned in the > > > > docs). > > > > >>> > > > > >>>> Alternatively, how about we add a doc for > > > > >>> checkpointing.interval-during-backlog explaining its > impact/concern > > > as > > > > >>> discussed above? > > > > >>> > > > > >>> We should do this independently from marking the APIs/config > > options > > > as > > > > >>> `@Experimental` > > > > >>> > > > > >>> Best, > > > > >>> Piotrek > > > > >>> > > > > >>> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> > > napisał(a): > > > > >>> > > > > >>>> Hi Piotr, > > > > >>>> > > > > >>>> Thanks for the reply! > > > > >>>> > > > > >>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski < > > > > piotr.nowoj...@gmail.com > > > > >>>> > > > > >>>> wrote: > > > > >>>> > > > > >>>>> Hi, > > > > >>>>> > > > > >>>>> Sorry for the long delay in responding! > > > > >>>>> > > > > >>>>>> Given that it is an optional feature that can be > > > > >>>>>> turned off by users, it might be OK to just let users try it > out > > > and > > > > >>> we > > > > >>>>> can > > > > >>>>>> fix performance issues once we detect any of them. What do you > > > > think? > > > > >>>>> I think it's fine. It would be best to mark this feature as > > > > >>> experimental, > > > > >>>>> and > > > > >>>>> we say that the config keys or the default values might change > in > > > the > > > > >>>>> future. > > > > >>>>> > > > > >>>> In general I agree we can mark APIs that determine "whether to > > > enable > > > > >>>> dynamic switching between stream/batch mode" as experimental. > > > > >>>> > > > > >>>> However, I am not sure we have such an API yet. The APIs added > in > > > this > > > > >>> FLIP > > > > >>>> are intended to be used by operator developers rather than end > > > users. > > > > > End > > > > >>>> users can enable this capability by setting > > > > >>>> execution.checkpointing.interval-during-backlog = Long.MAX and > > uses > > > a > > > > >>>> source which might implicitly set backlog statu (e.g. > > HybridSource). > > > > So > > > > >>>> execution.checkpointing.interval-during-backlog is the only > > > > user-facing > > > > >>>> APIs that can always control whether this feature can be used. > > > > >>>> > > > > >>>> However, execution.checkpointing.interval-during-backlog itself > is > > > not > > > > >>> tied > > > > >>>> to FLIP-327. > > > > >>>> > > > > >>>> Do you mean we should set checkpointing.interval-during-backlog > as > > > > >>>> experimental? Alternatively, how about we add a doc for > > > > >>>> checkpointing.interval-during-backlog explaining its > > impact/concern > > > as > > > > >>>> discussed above? > > > > >>>> > > > > >>>> Best, > > > > >>>> Dong > > > > >>>> > > > > >>>> > > > > >>>>>> Maybe we can revisit the need for such a config when we > > > > >>>> introduce/discuss > > > > >>>>>> the capability to switch backlog from false to true in the > > future. > > > > >>> What > > > > >>>>> do > > > > >>>>>> you think? > > > > >>>>> Sure, we can do that. > > > > >>>>> > > > > >>>>> Best, > > > > >>>>> Piotrek > > > > >>>>> > > > > >>>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> > > > > napisał(a): > > > > >>>>> > > > > >>>>>> Hi Piotr, > > > > >>>>>> > > > > >>>>>> Thanks a lot for the explanation. Please see my reply inline. > > > > >>>>>> > > > > >>>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski < > > > > >>>>> piotr.nowoj...@gmail.com> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Hi Dong, > > > > >>>>>>> > > > > >>>>>>> Thanks a lot for the answers. I can now only briefly answer > > your > > > > >>> last > > > > >>>>>>> email. > > > > >>>>>>> > > > > >>>>>>>> It is possible that spilling to disks might cause larger > > > > >>> overhead. > > > > >>>>> IMO > > > > >>>>>> it > > > > >>>>>>>> is an orthogonal issue already existing in Flink. This is > > > > >>> because a > > > > >>>>>> Flink > > > > >>>>>>>> job running batch mode might also be slower than its > > throughput > > > > >>> in > > > > >>>>>> stream > > > > >>>>>>>> mode due to the same reason. > > > > >>>>>>> Yes, I know, but the thing that worries me is that previously > > > only > > > > >>> a > > > > >>>>> user > > > > >>>>>>> alone > > > > >>>>>>> could decide whether to use batch mode or streaming, and in > > > > >>> practice > > > > >>>>> one > > > > >>>>>>> user would rarely (if ever) use both for the same > > > > >>> problem/job/query. > > > > >>>> If > > > > >>>>>> his > > > > >>>>>>> intention was to eventually process live data, he was using > > > > >>> streaming > > > > >>>>>> even > > > > >>>>>>> if there was a large backlog at the start (apart of some very > > few > > > > >>>> very > > > > >>>>>>> power > > > > >>>>>>> users). > > > > >>>>>>> > > > > >>>>>>> With this change, we want to introduce a mode that would be > > > > >>> switching > > > > >>>>>> back > > > > >>>>>>> and forth between streaming and "batch in streaming" > > > automatically. > > > > >>>> So > > > > >>>>> a > > > > >>>>>>> potential performance regression would be much more visible > and > > > > >>>> painful > > > > >>>>>>> at the same time. If batch query runs slower then it could, > > it's > > > > >>> kind > > > > >>>>> of > > > > >>>>>>> fine as > > > > >>>>>>> it will end at some point. If streaming query during large > back > > > > >>>>> pressure > > > > >>>>>>> maybe > > > > >>>>>>> temporary load spike switches to batch processing, that's a > > > bigger > > > > >>>>> deal. > > > > >>>>>>> Especially if batch processing mode will not be able to > > actually > > > > >>> even > > > > >>>>>>> handle > > > > >>>>>>> the normal load, after the load spike. In that case, the job > > > could > > > > >>>>> never > > > > >>>>>>> recover > > > > >>>>>>> from the backpressure/backlog mode. > > > > >>>>>>> > > > > >>>>>> I understand you are concerned with the risk of performance > > > > >>> regression > > > > >>>>>> introduced due to switching to batch mode. > > > > >>>>>> > > > > >>>>>> After thinking about this more, I think this existing proposal > > > meets > > > > >>>> the > > > > >>>>>> minimum requirement of "not introducing regression for > existing > > > > >>> jobs". > > > > >>>>> The > > > > >>>>>> reason is that even if batch mode can be slower than stream > mode > > > for > > > > >>>> some > > > > >>>>>> operators in some cases, this is an optional feature that will > > > only > > > > >>> be > > > > >>>>>> enabled if a user explicitly overrides the newly introduced > > config > > > > to > > > > >>>>>> non-default values. Existing jobs that simply upgrade their > > Flink > > > > >>>> library > > > > >>>>>> version will not suffer any performance regression. > > > > >>>>>> > > > > >>>>>> More specifically, in order to switch to batch mode, users > will > > > need > > > > >>> to > > > > >>>>>> explicitly set execution.checkpointing.interval-during-backlog > > to > > > 0. > > > > >>>> And > > > > >>>>>> users can always explicitly update > > > > >>>>>> execution.checkpointing.interval-during-backlog to turn off > the > > > > batch > > > > >>>>> mode > > > > >>>>>> if that incurs any performance issue. > > > > >>>>>> > > > > >>>>>> As far as I can tell, for all practical workloads we see in > > > > >>> production > > > > >>>>>> jobs, batch mode is always faster (w.r.t. throughput) than > > stream > > > > >>> mode > > > > >>>>> when > > > > >>>>>> there is a high backlog of incoming records. Though it is > still > > > > >>>>>> theoretically possible, it should be very rare (if any) for > > batch > > > > >>> mode > > > > >>>> to > > > > >>>>>> be slower in practice. Given that it is an optional feature > that > > > can > > > > >>> be > > > > >>>>>> turned off by users, it might be OK to just let users try it > out > > > and > > > > >>> we > > > > >>>>> can > > > > >>>>>> fix performance issues once we detect any of them. What do you > > > > think? > > > > >>>>>> > > > > >>>>>> > > > > >>>>>>>> execution.backlog.use-full-batch-mode-on-start (default > false) > > > > >>>>>>> ops sorry, it was supposed to be sth like: > > > > >>>>>>> > > > > >>>>>>> execution.backlog.use-batch-mode-only-on-start (default > false) > > > > >>>>>>> > > > > >>>>>>> That option would disallow switching from streaming to batch. > > > Batch > > > > >>>>> mode > > > > >>>>>>> would be allowed only to get rid of the initial, present on > > > > >>> start-up > > > > >>>>>>> backlog. > > > > >>>>>>> > > > > >>>>>>> Would allow us to safely experiment with switching from > > streaming > > > > >>> to > > > > >>>>>> batch > > > > >>>>>>> and I would be actually more fine in enabling "using batch > mode > > > on > > > > >>>>> start" > > > > >>>>>>> by default, until we gain confidence and feedback that > > switching > > > > >>>> back & > > > > >>>>>>> forth > > > > >>>>>>> is working as expected. > > > > >>>>>>> > > > > >>>>>> Now I understand what you are suggesting. I agree that it is > > > > >>> necessary > > > > >>>>> for > > > > >>>>>> users to be able to disallow switching from streaming to > batch. > > > > >>>>>> > > > > >>>>>> I am not sure it is necessary to introduce an extra config > just > > > for > > > > >>>> this > > > > >>>>>> purpose. The reason is that we don't have any strategy that > > > switches > > > > >>>>>> backlog status from false to true yet. And when we have such > > > > strategy > > > > >>>>> (e.g. > > > > >>>>>> FLIP-328) in the future, it is very likely that we will > > introduce > > > > >>> extra > > > > >>>>>> config(s) for users to explicitly turn on such a feature. That > > > means > > > > >>>> user > > > > >>>>>> should be able to turn off this feature even if we don't have > > > > >>> something > > > > >>>>>> like execution.backlog.use-batch-mode-only-on-start. > > > > >>>>>> > > > > >>>>>> Maybe we can revisit the need for such a config when we > > > > >>>> introduce/discuss > > > > >>>>>> the capability to switch backlog from false to true in the > > future. > > > > >>> What > > > > >>>>> do > > > > >>>>>> you think? > > > > >>>>>> > > > > >>>>>> > > > > >>>>>>>>> Or we could limit the scope of this FLIP to only support > > > > >>> starting > > > > >>>>> with > > > > >>>>>>>>> batch mode and switching only once to > > > > >>>>>>>>> streaming, and design a follow up with switching back and > > > forth? > > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP > > into > > > > >>>> two > > > > >>>>>>> FLIPs > > > > >>>>>>>> so that we can make incremental progress. > > > > >>>>>>> Great, let's do that. In a follow up FLIP we could restart > the > > > > >>>>> discussion > > > > >>>>>>> about > > > > >>>>>>> switching back and forth. > > > > >>>>>>> > > > > >>>>>> Cool, I added the following statement to the motivation > section. > > > > >>>>>> > > > > >>>>>> "NOTE: this FLIP focuses only on the capability to switch from > > > batch > > > > >>> to > > > > >>>>>> stream mode. If there is any extra API needed to support > > switching > > > > >>> from > > > > >>>>>> stream to batch mode, we will discuss them in a follow-up > FLIP." > > > > >>>>>> > > > > >>>>>> I am looking forward to reading your follow-up thoughts! > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> Dong > > > > >>>>>> > > > > >>>>>> > > > > >>>>>>> Piotrek > > > > >>>>>>> > > > > >>>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> > > > > >>> napisał(a): > > > > >>>>>>>> Hi Piotr, > > > > >>>>>>>> > > > > >>>>>>>> Thank you for the very detailed comments! Please see my > reply > > > > >>>> inline. > > > > >>>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski < > > > > >>>>>>> piotr.nowoj...@gmail.com> > > > > >>>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>>> Hi Dong, > > > > >>>>>>>>> > > > > >>>>>>>>> I have a couple of follow up questions about switching back > > and > > > > >>>>> forth > > > > >>>>>>>>> between streaming and batching mode. > > > > >>>>>>>>> Especially around shuffle/watermark strategy, and keyed > state > > > > >>>>>> backend. > > > > >>>>>>>>> First of all, it might not always be beneficial to switch > > into > > > > >>>> the > > > > >>>>>>> batch > > > > >>>>>>>>> modes: > > > > >>>>>>>>> - Shuffle strategy > > > > >>>>>>>>> - Is sorting going to be purely in-memory? If not, > > > > >>> obviously > > > > >>>>>>> spilling > > > > >>>>>>>>> to disks might cause larger overheads > > > > >>>>>>>>> compared to not sorting the records. > > > > >>>>>>>>> > > > > >>>>>>>> Sorting might require spilling data to disk depending on the > > > > >>> input > > > > >>>>>> size. > > > > >>>>>>>> The behavior of sorting w.r.t. memory/disk is expected to be > > > > >>>> exactly > > > > >>>>>> the > > > > >>>>>>>> same as the behavior of input sorting automatically > performed > > by > > > > >>>>> Flink > > > > >>>>>>>> runtime in batch mode for keyed inputs. > > > > >>>>>>>> > > > > >>>>>>>> More specifically, ExternalSorter > > > > >>>>>>>> < > > > > >>>>>>>> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java > > > > >>>>>>>> is > > > > >>>>>>>> currently used to sort keyed inputs in batch mode. It is > > > > >>>>> automatically > > > > >>>>>>> used > > > > >>>>>>>> by Flink runtime in OneInputStreamTask (here > > > > >>>>>>>> < > > > > >>>>>>>> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114 > > > > >>>>>>>>> ) > > > > >>>>>>>> and in MultiInputSortingDataInput (here > > > > >>>>>>>> < > > > > >>>>>>>> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188 > > > > >>>>>>>>> ). > > > > >>>>>>>> We plan to re-use the same code/mechanism to do sorting. > > > > >>>>>>>> > > > > >>>>>>>> It is possible that spilling to disks might cause larger > > > > >>> overhead. > > > > >>>>> IMO > > > > >>>>>> it > > > > >>>>>>>> is an orthogonal issue already existing in Flink. This is > > > > >>> because a > > > > >>>>>> Flink > > > > >>>>>>>> job running batch mode might also be slower than its > > throughput > > > > >>> in > > > > >>>>>> stream > > > > >>>>>>>> mode due to the same reason. However, even though it is > > possible > > > > >>> in > > > > >>>>>>> theory, > > > > >>>>>>>> I expect that in practice the throughput of using sorting + > > > > >>>>>>>> BatchExecutionKeyedStateBackend should be much higher than > > using > > > > >>>>> other > > > > >>>>>>>> keyed statebackends when the amount of data is large. As a > > > matter > > > > >>>> of > > > > >>>>>>> fact, > > > > >>>>>>>> we have not heard of complaints of such performance > regression > > > > >>>> issues > > > > >>>>>> in > > > > >>>>>>>> batch mode. > > > > >>>>>>>> > > > > >>>>>>>> The primary goal of this FLIP is to allow the operator to > run > > at > > > > >>>> the > > > > >>>>>> same > > > > >>>>>>>> throughput (in stream mode when there is backlog) as it can > > > > >>>> currently > > > > >>>>>> do > > > > >>>>>>> in > > > > >>>>>>>> batch mode. And this goal is not affected by the disk > overhead > > > > >>>> issue > > > > >>>>>>>> mentioned above. > > > > >>>>>>>> > > > > >>>>>>>> I am thinking maybe we can treat it as an orthogonal > > performance > > > > >>>>>>>> optimization problem instead of solving this problem in this > > > > >>> FLIP? > > > > >>>>>>>> - If it will be at least partially in-memory, does Flink > > > have > > > > >>>>> some > > > > >>>>>>>>> mechanism to reserve optional memory that > > > > >>>>>>>>> can be revoked if a new operator starts up? Can this > > > > >>> memory > > > > >>>>> be > > > > >>>>>>>>> redistributed? Ideally we should use as > > > > >>>>>>>>> much as possible of the available memory to avoid > > > > >>> spilling > > > > >>>>>> costs, > > > > >>>>>>>> but > > > > >>>>>>>>> also being able to revoke that memory > > > > >>>>>>>>> > > > > >>>>>>>> This FLIP does not support dynamically > revoking/redistribuitng > > > > >>>>> managed > > > > >>>>>>>> memory used by the ExternalSorter. > > > > >>>>>>>> > > > > >>>>>>>> For operators with isInternalSorterSupported = true, we will > > > > >>>> allocate > > > > >>>>>> to > > > > >>>>>>>> this operator execution.sorted-inputs.memory > > > > >>>>>>>> < > > > > >>>>>>>> > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144 > > > > >>>>>>>> amount of managed memory. This is the same as how Flink > > > allocates > > > > >>>>>> managed > > > > >>>>>>>> memory to an operator when this operator has keyed inputs in > > > > >>> batch > > > > >>>>>> mode. > > > > >>>>>>>> Note that this FLIP intends to support operators to sort > > inputs > > > > >>>>>> whenever > > > > >>>>>>>> there is backlog. And there is currently no way for an > > operator > > > > >>> to > > > > >>>>> know > > > > >>>>>>> in > > > > >>>>>>>> advance whether there will be no backlog after a given time. > > So > > > > >>> it > > > > >>>>>> seems > > > > >>>>>>>> simpler to just keep managed memory for such an operator > > > > >>> throughout > > > > >>>>> the > > > > >>>>>>>> lifecycle of this operator, for now. > > > > >>>>>>>> > > > > >>>>>>>> Besides, it seems that the lack of ability to dynamically > > > > >>>>>>>> revoke/redistribute un-used managed memory is an existing > > issue > > > > >>> in > > > > >>>>>> Flink. > > > > >>>>>>>> For example, we might have two operators sharing the same > slot > > > > >>> and > > > > >>>>>> these > > > > >>>>>>>> two operators both use managed memory (e.g. to sort inputs). > > > > >>> There > > > > >>>> is > > > > >>>>>>>> currently no way for one operator to re-use the memory not > > used > > > > >>> by > > > > >>>>> the > > > > >>>>>>>> other operator. > > > > >>>>>>>> > > > > >>>>>>>> Therefore, I think we can treat this as an orthogonal > > > performance > > > > >>>>>>>> optimization problem which can be addressed separately. What > > do > > > > >>> you > > > > >>>>>>> think? > > > > >>>>>>>> > > > > >>>>>>>>> - Sometimes sorting, even if we have memory to do that, > > > > >>> might > > > > >>>>> be > > > > >>>>>> an > > > > >>>>>>>>> unnecessary overhead. > > > > >>>>>>>>> - Watermarks > > > > >>>>>>>>> - Is holding back watermarks always good? If we have > tons > > > > >>> of > > > > >>>>> data > > > > >>>>>>>>> buffered/sorted and waiting to be processed > > > > >>>>>>>>> with multiple windows per key and many different > keys. > > > > >>>> When > > > > >>>>> we > > > > >>>>>>>>> switch back to `isBacklog=false` we > > > > >>>>>>>>> first process all of that data before processing > > > > >>>> watermarks, > > > > >>>>>> for > > > > >>>>>>>>> operators that are not using sorted input the > > > > >>>>>>>>> state size can explode significantly causing lots of > > > > >>>>> problems. > > > > >>>>>>>> Even > > > > >>>>>>>>> for those that can use sorting, switching to > > > > >>>>>>>>> sorting or BatchExecutionKeyedStateBackend is not > > > > >>> always a > > > > >>>>>> good > > > > >>>>>>>>> idea, but keeping RocksDB also can be > > > > >>>>>>>>> risky. > > > > >>>>>>>>> > > > > >>>>>>>> With the current FLIP, the proposal is to use a sorter only > > when > > > > >>>> the > > > > >>>>>>> inputs > > > > >>>>>>>> have keys. According to this practice, operators which are > not > > > > >>>> using > > > > >>>>>>>> sorting should have un-keyed inputs. I believe such an > > operator > > > > >>>> will > > > > >>>>>> not > > > > >>>>>>>> even use a keyed state backend. Maybe I missed some > use-case. > > > Can > > > > >>>> you > > > > >>>>>>>> provide a use-case where we will have an operator with > > un-keyed > > > > >>>>> inputs > > > > >>>>>>>> whose state size can explode due to we holding back > > watermarks? > > > > >>>>>>>> > > > > >>>>>>>> For operators with keyed inputs that use sorting, I suppose > it > > > is > > > > >>>>>>> possible > > > > >>>>>>>> that sorting + BatchExecutionKeyedStateBackend can be worse > > than > > > > >>>>> using > > > > >>>>>>>> RocksDB. But I believe this is very very rare (if possible) > in > > > > >>>> almost > > > > >>>>>>>> practical usage of Flink. > > > > >>>>>>>> > > > > >>>>>>>> Take one step back, if this indeed cause regression for a > real > > > > >>>>>> use-case, > > > > >>>>>>>> user can set execution.checkpointing.interval-during-backlog > > to > > > > >>>>>> anything > > > > >>>>>>>> other than 0 so that this FLIP will not use > > > > >>>>>>>> sorter + BatchExecutionKeyedStateBackend even even when > there > > is > > > > >>>>>> backlog. > > > > >>>>>>>> I would hope we can find a way to automatically determine > > > whether > > > > >>>>> using > > > > >>>>>>>> sorting + BatchExecutionKeyedStateBackend can be better or > > worse > > > > >>>> than > > > > >>>>>>> using > > > > >>>>>>>> RocksDB alone. But I could not find a good and reliable way > to > > > do > > > > >>>>> this. > > > > >>>>>>>> Maybe we can update Flink to do this when we find a good way > > to > > > > >>> do > > > > >>>>> this > > > > >>>>>>> in > > > > >>>>>>>> the future? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> - Keyed state backend > > > > >>>>>>>>> - I think you haven't described what happens during > > > > >>> switching > > > > >>>>>> from > > > > >>>>>>>>> streaming to backlog processing. > > > > >>>>>>>>> > > > > >>>>>>>> Good point. This indeed needs to be described. I added a > TODO > > in > > > > >>>> the > > > > >>>>>>>> "Behavior changes ..." section to describe what happens when > > > > >>>>> isBacklog > > > > >>>>>>>> switches from false to true, for all > > > > >>>>> watermark/checkpoint/statebackend > > > > >>>>>>> etc. > > > > >>>>>>>> Let me explain this for the state backend here for now. I > will > > > > >>>> update > > > > >>>>>>> FLIP > > > > >>>>>>>> later. > > > > >>>>>>>> > > > > >>>>>>>> When isBacklog switches from false to true, operator with > > keyed > > > > >>>>> inputs > > > > >>>>>>> can > > > > >>>>>>>> optionally (as determined by its implementation) starts to > use > > > > >>>>> internal > > > > >>>>>>>> sorter to sort inputs by key, without processing inputs or > > > > >>> updating > > > > >>>>>>>> statebackend, until it receives end-of-inputs or isBacklog > is > > > > >>>>> switched > > > > >>>>>> to > > > > >>>>>>>> false again. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> - Switch can be an unnecessary overhead. > > > > >>>>>>>> > > > > >>>>>>>> I agree it can cause unnecessary overhead, particularly when > > > > >>>>> isBacklog > > > > >>>>>>>> switches back and forth frequently. Whether or not this is > > > > >>>>> unnecessary > > > > >>>>>>>> likely depends on the duration/throughput of the backlog > phase > > > as > > > > >>>>> well > > > > >>>>>> as > > > > >>>>>>>> the specific computation logic of the operator. I am not > sure > > > > >>> there > > > > >>>>> is > > > > >>>>>> a > > > > >>>>>>>> good way for Flink to determine in advance whether switching > > is > > > > >>>>>>>> unnecessary. > > > > >>>>>>>> > > > > >>>>>>>> Note that for the existing use-case where we expect to > change > > > > >>>>> isBacklog > > > > >>>>>>> to > > > > >>>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source watermark > > lag > > > > >>>> being > > > > >>>>>> too > > > > >>>>>>>> high), we don't expect the watermark to switch back and > force > > > > >>>>>> frequently. > > > > >>>>>>>> And user can disable this switch by setting > > > > >>>>>>>> execution.checkpointing.interval-during-backlog to anything > > > other > > > > >>>>> than > > > > >>>>>> 0. > > > > >>>>>>>> Therefore, I am wondering if we can also view this as a > > > > >>> performance > > > > >>>>>>>> optimization opportunity for extra use-cases in the future, > > > > >>> rather > > > > >>>>>> than a > > > > >>>>>>>> blocking issue of this FLIP for the MVP use-case (e.g. > > snapshot > > > > >>>> phase > > > > >>>>>> for > > > > >>>>>>>> any CDC source, Kafka watermark lag). > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> At the same time, in your current proposal, for > > > > >>>>>>>>> `execution.checkpointing.interval-during-backlog > 0` we > > won't > > > > >>>>>>>>> switch to "batch" mode at all. That's a bit of shame, I > don't > > > > >>>>>>> understand > > > > >>>>>>>>> why those two things should be coupled > > > > >>>>>>>>> together? > > > > >>>>>>>>> > > > > >>>>>>>> We can in general classify optimizations as those that are > > > > >>>> compatible > > > > >>>>>>> with > > > > >>>>>>>> checkpointing, and those that are not compatible with > > > > >>>> checkpointing. > > > > >>>>>> For > > > > >>>>>>>> example, input sorting is currently not compatible with > > > > >>>>> checkpointing. > > > > >>>>>>> And > > > > >>>>>>>> buffering input records to reduce state backend overhead > (and > > > > >>>>> probably > > > > >>>>>>>> columnar processing for mini-batch in the future) is > > compatible > > > > >>>> with > > > > >>>>>>>> checkpointing. > > > > >>>>>>>> > > > > >>>>>>>> The primary of FLIP-327 is to support optimizations not > > > > >>> compatible > > > > >>>>> with > > > > >>>>>>>> checkpointing. If > > > > >>> execution.checkpointing.interval-during-backlog > > > > > >>>>> 0, > > > > >>>>>>>> which means that user intends to still do checkpointing even > > > when > > > > >>>>> there > > > > >>>>>>> is > > > > >>>>>>>> backog, then we will not be able to support such > > optimizations. > > > > >>>>>>>> > > > > >>>>>>>> For optimizations that are compatible with checkpointing, we > > can > > > > >>> do > > > > >>>>>> this > > > > >>>>>>>> even when the operator does not run in "batch mode". There > are > > > > >>>> extra > > > > >>>>>>>> problems to solve in order to achieve this optimization, > such > > as > > > > >>>>>>> supporting > > > > >>>>>>>> unaligned checkpointing without prolonging its sync phase. I > > > plan > > > > >>>> to > > > > >>>>>>>> explain how this can be done in FLIP-325. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> All in all, shouldn't we aim for some more clever process > of > > > > >>>>>> switching > > > > >>>>>>>> back > > > > >>>>>>>>> and forth between streaming/batch modes > > > > >>>>>>>>> for watermark strategy/state backend/sorting based on some > > > > >>>> metrics? > > > > >>>>>>>> Trying > > > > >>>>>>>>> to either predict if switching might help, > > > > >>>>>>>>> or trying to estimate if the last switch was beneficial? > > Maybe > > > > >>>>>>> something > > > > >>>>>>>>> along the lines: > > > > >>>>>>>>> - sort only in memory and during sorting count the number > of > > > > >>>>> distinct > > > > >>>>>>>> keys > > > > >>>>>>>>> (NDK) > > > > >>>>>>>>> - maybe allow for spilling if so far in memory we have > > NDK > > > > >>> * > > > > >>>> 5 > > > > >>>>>> = > > > > >>>>>>>>> #records > > > > >>>>>>>>> - do not allow to buffer records above a certain threshold, > > as > > > > >>>>>>> otherwise > > > > >>>>>>>>> checkpointing can explode > > > > >>>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if NDK > * 2 > > > > >>>> = > > > > >>>>>>> #records > > > > >>>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records > > > > >>>>>>>>> > > > > >>>>>>>>> Or even maybe for starters something even simpler and then > > test > > > > >>>> out > > > > >>>>>>>>> something more fancy as a follow up? > > > > >>>>>>>>> > > > > >>>>>>>> I agree it is worth investigating these ideas to further > > > optimize > > > > >>>> the > > > > >>>>>>>> performance during backlog. > > > > >>>>>>>> > > > > >>>>>>>> I just think these can be done independently after this > FLIP. > > > The > > > > >>>>> focus > > > > >>>>>>> of > > > > >>>>>>>> this FLIP is to re-use in stream mode the same optimization > > > which > > > > >>>> we > > > > >>>>>>>> already use in batch mode, rather than inventing or > improving > > > the > > > > >>>>>>>> performance of these existing optimizations. > > > > >>>>>>>> > > > > >>>>>>>> Given that there are already a lot of new mechanism/features > > to > > > > >>>>> discuss > > > > >>>>>>> and > > > > >>>>>>>> address in this FLIP, I am hoping we can limit the scope of > > this > > > > >>>> FLIP > > > > >>>>>> to > > > > >>>>>>>> re-use the existing optimization, and do these extra > > > optimization > > > > >>>>>>>> opportunities as future work. > > > > >>>>>>>> > > > > >>>>>>>> What do you think? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> At the same time, > > > > >>>>> `execution.checkpointing.interval-during-backlog=0` > > > > >>>>>>>> seems > > > > >>>>>>>>> a weird setting to me, that I would > > > > >>>>>>>>> not feel safe recommending to anyone. If processing of a > > > > >>> backlog > > > > >>>>>> takes > > > > >>>>>>> a > > > > >>>>>>>>> long time, a job might stop making > > > > >>>>>>>>> any progress due to some random failures. Especially > > dangerous > > > > >>>> if a > > > > >>>>>> job > > > > >>>>>>>> switches from streaming mode back to > > > > >>>>>>>>> backlog processing due to some reasons, as that could > happen > > > > >>>> months > > > > >>>>>>> after > > > > >>>>>>>>> someone started a job with this > > > > >>>>>>>>> strange setting. So should we even have it? I would simply > > > > >>>> disallow > > > > >>>>>>> it. I > > > > >>>>>>>> Good point. I do agree we need to further work to improve > the > > > > >>>>> failover > > > > >>>>>>>> performance in case any task fails. > > > > >>>>>>>> > > > > >>>>>>>> As of the current FLIP, if any task fails during backlog and > > > > >>>>>>>> execution.checkpointing.interval-during-backlog = 0, we will > > > need > > > > >>>> to > > > > >>>>>>>> restart all operators to the last checkpointed state and > > > continue > > > > >>>>>>>> processing backlog. And this can be a lot of rollback since > > > there > > > > >>>> is > > > > >>>>> no > > > > >>>>>>>> checkpoint during backlog. And this can also be worse than > > batch > > > > >>>>> since > > > > >>>>>>> this > > > > >>>>>>>> FLIP currently does not support exporting/saving records to > > > local > > > > >>>>> disk > > > > >>>>>>> (or > > > > >>>>>>>> shuffle service) so that a failed task can re-consume the > > > records > > > > >>>>> from > > > > >>>>>>> the > > > > >>>>>>>> upstream task (or shuffle service) in the same way as how > > Flink > > > > >>>>>> failover > > > > >>>>>>> a > > > > >>>>>>>> task in batch mode. > > > > >>>>>>>> > > > > >>>>>>>> I think we can extend this FLIP to solve this problem so > that > > it > > > > >>>> can > > > > >>>>>> have > > > > >>>>>>>> at least the same behavior/performance as batch-mode job. > The > > > > >>> idea > > > > >>>> is > > > > >>>>>> to > > > > >>>>>>>> also follow what batch mode does. For example, we can > trigger > > a > > > > >>>>>>> checkpoint > > > > >>>>>>>> when isBacklog switches to true, and every operator should > > > buffer > > > > >>>> its > > > > >>>>>>>> output in the TM local disk (or remote shuffle service). > > > > >>> Therefore, > > > > >>>>>>> after a > > > > >>>>>>>> task fails, it can restart from the last checkpoint and > > > > >>> re-consume > > > > >>>>> data > > > > >>>>>>>> buffered in the upstream task. > > > > >>>>>>>> > > > > >>>>>>>> I will update FLIP as described above. Would this address > your > > > > >>>>> concern? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> could see a power setting like: > > > > >>>>>>>>> `execution.backlog.use-full-batch-mode-on-start > > > > >>> (default > > > > >>>>>>> false)` > > > > >>>>>>>> I am not sure I fully understand this config or its > > motivation. > > > > >>> Can > > > > >>>>> you > > > > >>>>>>>> help explain the exact semantics of this config? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> that would override any heuristic of switching to backlog > if > > > > >>>>> someone > > > > >>>>>> is > > > > >>>>>>>>> submitting a new job that starts with > > > > >>>>>>>>> `isBacklog=true`. > > > > >>>>>>>>> > > > > >>>>>>>>> Or we could limit the scope of this FLIP to only support > > > > >>> starting > > > > >>>>>> with > > > > >>>>>>>>> batch mode and switching only once to > > > > >>>>>>>>> streaming, and design a follow up with switching back and > > > > >>> forth? > > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP > > into > > > > >>>> two > > > > >>>>>>> FLIPs > > > > >>>>>>>> so that we can make incremental progress. > > > > >>>>>>>> > > > > >>>>>>>> Best, > > > > >>>>>>>> Dong > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>> I'm looking forwards to hearing/reading out your thoughts. > > > > >>>>>>>>> > > > > >>>>>>>>> Best, > > > > >>>>>>>>> Piotrek > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge > <j...@ververica.com.invalid > > > > > > > >>>>>>>> napisał(a): > > > > >>>>>>>>>> Hi Dong, > > > > >>>>>>>>>> > > > > >>>>>>>>>> Thanks for your reply! > > > > >>>>>>>>>> > > > > >>>>>>>>>> Best regards, > > > > >>>>>>>>>> Jing > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin < > > > > >>> lindon...@gmail.com> > > > > >>>>>>> wrote: > > > > >>>>>>>>>>> Hi Jing, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks for the comments. Please see my reply inline. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge > > > > >>>>>> <j...@ververica.com.invalid > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Hi Dong, > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks for the clarification. Now it is clear for me. I > > > > >>> got > > > > >>>>>>>>> additional > > > > >>>>>>>>>>> noob > > > > >>>>>>>>>>>> questions wrt the internal sorter. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 1. when to call setter to set the > internalSorterSupported > > > > >>>> to > > > > >>>>> be > > > > >>>>>>>> true? > > > > >>>>>>>>>>> Developer of the operator class (i.e. those classes which > > > > >>>>>>> implements > > > > >>>>>>>>>>> `StreamOperator`) should override the > > > > >>>>> `#getOperatorAttributes()` > > > > >>>>>>> API > > > > >>>>>>>> to > > > > >>>>>>>>>> set > > > > >>>>>>>>>>> internalSorterSupported to true, if he/she decides to > sort > > > > >>>>>> records > > > > >>>>>>>>>>> internally in the operator. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> 2 > > > > >>>>>>>>>>>> *"For those operators whose throughput can be > > > > >>> considerably > > > > >>>>>>> improved > > > > >>>>>>>>>> with > > > > >>>>>>>>>>> an > > > > >>>>>>>>>>>> internal sorter, update it to take advantage of the > > > > >>>> internal > > > > >>>>>>> sorter > > > > >>>>>>>>>> when > > > > >>>>>>>>>>>> its input has isBacklog=true.* > > > > >>>>>>>>>>>> *Typically, operators that involve aggregation operation > > > > >>>>> (e.g. > > > > >>>>>>>> join, > > > > >>>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from > > > > >>> using > > > > >>>> an > > > > >>>>>>>>> internal > > > > >>>>>>>>>>>> sorter."* > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> *"The operator that performs CoGroup operation will > > > > >>>>> instantiate > > > > >>>>>>> two > > > > >>>>>>>>>>>> internal sorter to sorts records from its two inputs > > > > >>>>>> separately. > > > > >>>>>>>> Then > > > > >>>>>>>>>> it > > > > >>>>>>>>>>>> can pull the sorted records from these two sorters. This > > > > >>>> can > > > > >>>>> be > > > > >>>>>>>> done > > > > >>>>>>>>>>>> without wrapping input records with TaggedUnion<...>. In > > > > >>>>>>>> comparison, > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>> existing DataStream#coGroup needs to wrap input records > > > > >>>> with > > > > >>>>>>>>>>>> TaggedUnion<...> before sorting them using one external > > > > >>>>> sorter, > > > > >>>>>>>> which > > > > >>>>>>>>>>>> introduces higher overhead."* > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> According to the performance test, it seems that > internal > > > > >>>>>> sorter > > > > >>>>>>>> has > > > > >>>>>>>>>>> better > > > > >>>>>>>>>>>> performance than external sorter. Is it possible to make > > > > >>>>> those > > > > >>>>>>>>>> operators > > > > >>>>>>>>>>>> that can benefit from it use internal sorter by default? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> Yes, it is possible. After this FLIP is done, users can > use > > > > >>>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the window > > > > >>>>> assigner > > > > >>>>>>> to > > > > >>>>>>>>>>> co-group two streams in effectively the batch manner. An > > > > >>>>> operator > > > > >>>>>>>> that > > > > >>>>>>>>>> uses > > > > >>>>>>>>>>> an internal sorter will be used to perform the co-group > > > > >>>>>> operation. > > > > >>>>>>>>> There > > > > >>>>>>>>>> is > > > > >>>>>>>>>>> no need for users of the DataStream API to explicitly > know > > > > >>> or > > > > >>>>> set > > > > >>>>>>> the > > > > >>>>>>>>>>> internal sorter in anyway. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> In the future, we plan to incrementally optimize other > > > > >>>>>> aggregation > > > > >>>>>>>>>>> operation (e.g. aggregate) on the DataStream API when > > > > >>>>>>>>> EndOfStreamWindows > > > > >>>>>>>>>> is > > > > >>>>>>>>>>> used as the window assigner. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Best, > > > > >>>>>>>>>>> Dong > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Best regards, > > > > >>>>>>>>>>>> Jing > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin < > > > > >>>>> lindon...@gmail.com> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>>>>>> Hi Jing, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thank you for the comments! Please see my reply inline. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge > > > > >>>>>>>> <j...@ververica.com.invalid > > > > >>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi Dong, > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in good > > > > >>>>>> shape. I > > > > >>>>>>>> got > > > > >>>>>>>>>>> some > > > > >>>>>>>>>>>>> NIT > > > > >>>>>>>>>>>>>> questions. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 1. It is a little bit weird to write the hint right > > > > >>>> after > > > > >>>>>> the > > > > >>>>>>>>>>>> motivation > > > > >>>>>>>>>>>>>> that some features have been moved to FLIP-331, > > > > >>> because > > > > >>>>> at > > > > >>>>>>> that > > > > >>>>>>>>>> time, > > > > >>>>>>>>>>>>>> readers don't know the context about what features > > > > >>> does > > > > >>>>> it > > > > >>>>>>>> mean. > > > > >>>>>>>>> I > > > > >>>>>>>>>>>> would > > > > >>>>>>>>>>>>>> suggest moving the note to the beginning of "Public > > > > >>>>>>> interfaces" > > > > >>>>>>>>>>>> sections. > > > > >>>>>>>>>>>>> Given that the reviewer who commented on this email > > > > >>>> thread > > > > >>>>>>>> before I > > > > >>>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331, I > > > > >>>> think > > > > >>>>> it > > > > >>>>>>> is > > > > >>>>>>>>>>> simpler > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>> just remove any mention of FLIP-331. I have updated the > > > > >>>>> FLIP > > > > >>>>>>>>>>> accordingly. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 2. It is also a little bit weird to describe all > > > > >>>>> behaviour > > > > >>>>>>>>> changes > > > > >>>>>>>>>> at > > > > >>>>>>>>>>>>> first > > > > >>>>>>>>>>>>>> but only focus on one single feature, i.e. how to > > > > >>>>> implement > > > > >>>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I was > > > > >>>>>> reading > > > > >>>>>>>> the > > > > >>>>>>>>>>> Public > > > > >>>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another > > > > >>> option > > > > >>>>>> could > > > > >>>>>>>> be > > > > >>>>>>>>> to > > > > >>>>>>>>>>>>> write a > > > > >>>>>>>>>>>>>> short summary of all features and point out that this > > > > >>>>> FLIP > > > > >>>>>>> will > > > > >>>>>>>>>> only > > > > >>>>>>>>>>>>> focus > > > > >>>>>>>>>>>>>> on the internalSorterSupported feature. Others could > > > > >>> be > > > > >>>>>> found > > > > >>>>>>>> in > > > > >>>>>>>>>>>>> FLIP-331. > > > > >>>>>>>>>>>>>> WDYT? > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow a > > > > >>>> stream > > > > >>>>>>> mode > > > > >>>>>>>>> job > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>> run > > > > >>>>>>>>>>>>> parts of the topology in batch mode so that it can > > > > >>> apply > > > > >>>>>>>>>>>>> optimizations/computations that can not be used > > > > >>> together > > > > >>>>> with > > > > >>>>>>>>>>>> checkpointing > > > > >>>>>>>>>>>>> (and thus not usable in stream mode). Although internal > > > > >>>>>> sorter > > > > >>>>>>> is > > > > >>>>>>>>> the > > > > >>>>>>>>>>>> only > > > > >>>>>>>>>>>>> optimization immediately supported in this FLIP, this > > > > >>>> FLIP > > > > >>>>>> lays > > > > >>>>>>>> the > > > > >>>>>>>>>>>>> foundation to support other optimizations in the > > > > >>> future, > > > > >>>>> such > > > > >>>>>>> as > > > > >>>>>>>>>> using > > > > >>>>>>>>>>>> GPU > > > > >>>>>>>>>>>>> to process a bounded stream of records. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Therefore, I find it better to keep the current title > > > > >>>>> rather > > > > >>>>>>> than > > > > >>>>>>>>>>>> limiting > > > > >>>>>>>>>>>>> the scope to internal sorter. What do you think? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and > > > > >>> failover > > > > >>>>>>>> strategy > > > > >>>>>>>>> -> > > > > >>>>>>>>>>>> Mixed > > > > >>>>>>>>>>>>>> mode -> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - If any task fails when isBacklog=false true, > > > > >>> this > > > > >>>>> task > > > > >>>>>>> is > > > > >>>>>>>>>>>> restarted > > > > >>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>> re-process its input from the beginning. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thank you for catching this issue. It is fixed now. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>> Dong > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Best regards > > > > >>>>>>>>>>>>>> Jing > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin < > > > > >>>>>> lindon...@gmail.com > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>> Hi Piotr, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thanks for your comments! Please see my reply > > > > >>> inline. > > > > >>>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski < > > > > >>>>>>>>>>>>> piotr.nowoj...@gmail.com > > > > >>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi Dong, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I have a couple of questions. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Could you explain why those properties > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> @Nullable private Boolean isOutputOnEOF = > > > > >>> null; > > > > >>>>>>>>>>>>>>>> @Nullable private Boolean > > > > >>> isOutputOnCheckpoint > > > > >>>> = > > > > >>>>>>> null; > > > > >>>>>>>>>>>>>>>> @Nullable private Boolean > > > > >>>>>> isInternalSorterSupported = > > > > >>>>>>>>> null; > > > > >>>>>>>>>>>>>>>> must be `@Nullable`, instead of having the > > > > >>> default > > > > >>>>>> value > > > > >>>>>>>> set > > > > >>>>>>>>> to > > > > >>>>>>>>>>>>>> `false`? > > > > >>>>>>>>>>>>>>> By initializing these private variables in > > > > >>>>>>>>>>> OperatorAttributesBuilder > > > > >>>>>>>>>>>> as > > > > >>>>>>>>>>>>>>> null, we can implement > > > > >>>>>> `OperatorAttributesBuilder#build()` > > > > >>>>>>> in > > > > >>>>>>>>>> such > > > > >>>>>>>>>>> a > > > > >>>>>>>>>>>>> way > > > > >>>>>>>>>>>>>>> that it can print DEBUG level logging to say > > > > >>>>>>>>>> "isOutputOnCheckpoint > > > > >>>>>>>>>>> is > > > > >>>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>> explicitly set". This can help user/SRE debug > > > > >>>>> performance > > > > >>>>>>>>> issues > > > > >>>>>>>>>>> (or > > > > >>>>>>>>>>>>> lack > > > > >>>>>>>>>>>>>>> of the expected optimization) due to operators not > > > > >>>>>>> explicitly > > > > >>>>>>>>>>> setting > > > > >>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>> right operator attribute. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> For example, we might want a job to always use the > > > > >>>>> longer > > > > >>>>>>>>>>>> checkpointing > > > > >>>>>>>>>>>>>>> interval (i.e. > > > > >>>>>>>> execution.checkpointing.interval-during-backlog) > > > > >>>>>>>>>> if > > > > >>>>>>>>>>>> all > > > > >>>>>>>>>>>>>>> running operators have isOutputOnCheckpoint==false, > > > > >>>> and > > > > >>>>>> use > > > > >>>>>>>> the > > > > >>>>>>>>>>> short > > > > >>>>>>>>>>>>>>> checkpointing interval otherwise. If a user has > > > > >>>>>> explicitly > > > > >>>>>>>>>>> configured > > > > >>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog but > > > > >>>> the > > > > >>>>>>>>> two-phase > > > > >>>>>>>>>>>>> commit > > > > >>>>>>>>>>>>>>> sink library has not been upgraded to set > > > > >>>>>>>>>>> isOutputOnCheckpoint=true, > > > > >>>>>>>>>>>>> then > > > > >>>>>>>>>>>>>>> the job will end up using the long checkpointing > > > > >>>>>> interval, > > > > >>>>>>>> and > > > > >>>>>>>>> it > > > > >>>>>>>>>>>> will > > > > >>>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>> useful to figure out what is going wrong in this > > > > >>> case > > > > >>>>> by > > > > >>>>>>>>> checking > > > > >>>>>>>>>>> the > > > > >>>>>>>>>>>>>> log. > > > > >>>>>>>>>>>>>>> Note that the default value of these fields of the > > > > >>>>>>>>>>> OperatorAttributes > > > > >>>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will > > > > >>>> still > > > > >>>>> be > > > > >>>>>>>>> false. > > > > >>>>>>>>>>> The > > > > >>>>>>>>>>>>>>> following is mentioned in the Java doc of > > > > >>>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> /** > > > > >>>>>>>>>>>>>>> * If any operator attribute is null, we will log > > > > >>> it > > > > >>>>> at > > > > >>>>>>>> DEBUG > > > > >>>>>>>>>>> level > > > > >>>>>> >