Hi Dong, Operators API is unfortunately also our public facing API and I mean the APIs that we will add there should also be marked `@Experimental` IMO.
The config options should also be marked as experimental (both annotated @Experimental and noted the same thing in the docs, if @Experimental annotation is not automatically mentioned in the docs). > Alternatively, how about we add a doc for checkpointing.interval-during-backlog explaining its impact/concern as discussed above? We should do this independently from marking the APIs/config options as `@Experimental` Best, Piotrek pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> napisał(a): > Hi Piotr, > > Thanks for the reply! > > On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > > > Hi, > > > > Sorry for the long delay in responding! > > > > > Given that it is an optional feature that can be > > > turned off by users, it might be OK to just let users try it out and we > > can > > > fix performance issues once we detect any of them. What do you think? > > > > I think it's fine. It would be best to mark this feature as experimental, > > and > > we say that the config keys or the default values might change in the > > future. > > > > In general I agree we can mark APIs that determine "whether to enable > dynamic switching between stream/batch mode" as experimental. > > However, I am not sure we have such an API yet. The APIs added in this FLIP > are intended to be used by operator developers rather than end users. End > users can enable this capability by setting > execution.checkpointing.interval-during-backlog = Long.MAX and uses a > source which might implicitly set backlog statu (e.g. HybridSource). So > execution.checkpointing.interval-during-backlog is the only user-facing > APIs that can always control whether this feature can be used. > > However, execution.checkpointing.interval-during-backlog itself is not tied > to FLIP-327. > > Do you mean we should set checkpointing.interval-during-backlog as > experimental? Alternatively, how about we add a doc for > checkpointing.interval-during-backlog explaining its impact/concern as > discussed above? > > Best, > Dong > > > > > Maybe we can revisit the need for such a config when we > introduce/discuss > > > the capability to switch backlog from false to true in the future. What > > do > > > you think? > > > > Sure, we can do that. > > > > Best, > > Piotrek > > > > niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a): > > > > > Hi Piotr, > > > > > > Thanks a lot for the explanation. Please see my reply inline. > > > > > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski < > > piotr.nowoj...@gmail.com> > > > wrote: > > > > > > > Hi Dong, > > > > > > > > Thanks a lot for the answers. I can now only briefly answer your last > > > > email. > > > > > > > > > It is possible that spilling to disks might cause larger overhead. > > IMO > > > it > > > > > is an orthogonal issue already existing in Flink. This is because a > > > Flink > > > > > job running batch mode might also be slower than its throughput in > > > stream > > > > > mode due to the same reason. > > > > > > > > Yes, I know, but the thing that worries me is that previously only a > > user > > > > alone > > > > could decide whether to use batch mode or streaming, and in practice > > one > > > > user would rarely (if ever) use both for the same problem/job/query. > If > > > his > > > > intention was to eventually process live data, he was using streaming > > > even > > > > if there was a large backlog at the start (apart of some very few > very > > > > power > > > > users). > > > > > > > > With this change, we want to introduce a mode that would be switching > > > back > > > > and forth between streaming and "batch in streaming" automatically. > So > > a > > > > potential performance regression would be much more visible and > painful > > > > at the same time. If batch query runs slower then it could, it's kind > > of > > > > fine as > > > > it will end at some point. If streaming query during large back > > pressure > > > > maybe > > > > temporary load spike switches to batch processing, that's a bigger > > deal. > > > > Especially if batch processing mode will not be able to actually even > > > > handle > > > > the normal load, after the load spike. In that case, the job could > > never > > > > recover > > > > from the backpressure/backlog mode. > > > > > > > > > > I understand you are concerned with the risk of performance regression > > > introduced due to switching to batch mode. > > > > > > After thinking about this more, I think this existing proposal meets > the > > > minimum requirement of "not introducing regression for existing jobs". > > The > > > reason is that even if batch mode can be slower than stream mode for > some > > > operators in some cases, this is an optional feature that will only be > > > enabled if a user explicitly overrides the newly introduced config to > > > non-default values. Existing jobs that simply upgrade their Flink > library > > > version will not suffer any performance regression. > > > > > > More specifically, in order to switch to batch mode, users will need to > > > explicitly set execution.checkpointing.interval-during-backlog to 0. > And > > > users can always explicitly update > > > execution.checkpointing.interval-during-backlog to turn off the batch > > mode > > > if that incurs any performance issue. > > > > > > As far as I can tell, for all practical workloads we see in production > > > jobs, batch mode is always faster (w.r.t. throughput) than stream mode > > when > > > there is a high backlog of incoming records. Though it is still > > > theoretically possible, it should be very rare (if any) for batch mode > to > > > be slower in practice. Given that it is an optional feature that can be > > > turned off by users, it might be OK to just let users try it out and we > > can > > > fix performance issues once we detect any of them. What do you think? > > > > > > > > > > > > > > > execution.backlog.use-full-batch-mode-on-start (default false) > > > > > > > > ops sorry, it was supposed to be sth like: > > > > > > > > execution.backlog.use-batch-mode-only-on-start (default false) > > > > > > > > That option would disallow switching from streaming to batch. Batch > > mode > > > > would be allowed only to get rid of the initial, present on start-up > > > > backlog. > > > > > > > > Would allow us to safely experiment with switching from streaming to > > > batch > > > > and I would be actually more fine in enabling "using batch mode on > > start" > > > > by default, until we gain confidence and feedback that switching > back & > > > > forth > > > > is working as expected. > > > > > > > > > > Now I understand what you are suggesting. I agree that it is necessary > > for > > > users to be able to disallow switching from streaming to batch. > > > > > > I am not sure it is necessary to introduce an extra config just for > this > > > purpose. The reason is that we don't have any strategy that switches > > > backlog status from false to true yet. And when we have such strategy > > (e.g. > > > FLIP-328) in the future, it is very likely that we will introduce extra > > > config(s) for users to explicitly turn on such a feature. That means > user > > > should be able to turn off this feature even if we don't have something > > > like execution.backlog.use-batch-mode-only-on-start. > > > > > > Maybe we can revisit the need for such a config when we > introduce/discuss > > > the capability to switch backlog from false to true in the future. What > > do > > > you think? > > > > > > > > > > > > > > >> Or we could limit the scope of this FLIP to only support starting > > with > > > > >> batch mode and switching only once to > > > > >> streaming, and design a follow up with switching back and forth? > > > > > > > > > > Sure, that sounds good to me. I am happy to split this FLIP into > two > > > > FLIPs > > > > > so that we can make incremental progress. > > > > > > > > Great, let's do that. In a follow up FLIP we could restart the > > discussion > > > > about > > > > switching back and forth. > > > > > > > > > > Cool, I added the following statement to the motivation section. > > > > > > "NOTE: this FLIP focuses only on the capability to switch from batch to > > > stream mode. If there is any extra API needed to support switching from > > > stream to batch mode, we will discuss them in a follow-up FLIP." > > > > > > I am looking forward to reading your follow-up thoughts! > > > > > > Best, > > > Dong > > > > > > > > > > Piotrek > > > > > > > > czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> napisał(a): > > > > > > > > > Hi Piotr, > > > > > > > > > > Thank you for the very detailed comments! Please see my reply > inline. > > > > > > > > > > On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski < > > > > piotr.nowoj...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > I have a couple of follow up questions about switching back and > > forth > > > > > > between streaming and batching mode. > > > > > > Especially around shuffle/watermark strategy, and keyed state > > > backend. > > > > > > > > > > > > First of all, it might not always be beneficial to switch into > the > > > > batch > > > > > > modes: > > > > > > - Shuffle strategy > > > > > > - Is sorting going to be purely in-memory? If not, obviously > > > > spilling > > > > > > to disks might cause larger overheads > > > > > > compared to not sorting the records. > > > > > > > > > > > > > > > > Sorting might require spilling data to disk depending on the input > > > size. > > > > > The behavior of sorting w.r.t. memory/disk is expected to be > exactly > > > the > > > > > same as the behavior of input sorting automatically performed by > > Flink > > > > > runtime in batch mode for keyed inputs. > > > > > > > > > > More specifically, ExternalSorter > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java > > > > > > > > > > > is > > > > > currently used to sort keyed inputs in batch mode. It is > > automatically > > > > used > > > > > by Flink runtime in OneInputStreamTask (here > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114 > > > > > >) > > > > > and in MultiInputSortingDataInput (here > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188 > > > > > >). > > > > > We plan to re-use the same code/mechanism to do sorting. > > > > > > > > > > It is possible that spilling to disks might cause larger overhead. > > IMO > > > it > > > > > is an orthogonal issue already existing in Flink. This is because a > > > Flink > > > > > job running batch mode might also be slower than its throughput in > > > stream > > > > > mode due to the same reason. However, even though it is possible in > > > > theory, > > > > > I expect that in practice the throughput of using sorting + > > > > > BatchExecutionKeyedStateBackend should be much higher than using > > other > > > > > keyed statebackends when the amount of data is large. As a matter > of > > > > fact, > > > > > we have not heard of complaints of such performance regression > issues > > > in > > > > > batch mode. > > > > > > > > > > The primary goal of this FLIP is to allow the operator to run at > the > > > same > > > > > throughput (in stream mode when there is backlog) as it can > currently > > > do > > > > in > > > > > batch mode. And this goal is not affected by the disk overhead > issue > > > > > mentioned above. > > > > > > > > > > I am thinking maybe we can treat it as an orthogonal performance > > > > > optimization problem instead of solving this problem in this FLIP? > > > > > > > > > > - If it will be at least partially in-memory, does Flink have > > some > > > > > > mechanism to reserve optional memory that > > > > > > can be revoked if a new operator starts up? Can this memory > > be > > > > > > redistributed? Ideally we should use as > > > > > > much as possible of the available memory to avoid spilling > > > costs, > > > > > but > > > > > > also being able to revoke that memory > > > > > > > > > > > > > > > > This FLIP does not support dynamically revoking/redistribuitng > > managed > > > > > memory used by the ExternalSorter. > > > > > > > > > > For operators with isInternalSorterSupported = true, we will > allocate > > > to > > > > > this operator execution.sorted-inputs.memory > > > > > < > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144 > > > > > > > > > > > amount of managed memory. This is the same as how Flink allocates > > > managed > > > > > memory to an operator when this operator has keyed inputs in batch > > > mode. > > > > > > > > > > Note that this FLIP intends to support operators to sort inputs > > > whenever > > > > > there is backlog. And there is currently no way for an operator to > > know > > > > in > > > > > advance whether there will be no backlog after a given time. So it > > > seems > > > > > simpler to just keep managed memory for such an operator throughout > > the > > > > > lifecycle of this operator, for now. > > > > > > > > > > Besides, it seems that the lack of ability to dynamically > > > > > revoke/redistribute un-used managed memory is an existing issue in > > > Flink. > > > > > For example, we might have two operators sharing the same slot and > > > these > > > > > two operators both use managed memory (e.g. to sort inputs). There > is > > > > > currently no way for one operator to re-use the memory not used by > > the > > > > > other operator. > > > > > > > > > > Therefore, I think we can treat this as an orthogonal performance > > > > > optimization problem which can be addressed separately. What do you > > > > think? > > > > > > > > > > > > > > > > - Sometimes sorting, even if we have memory to do that, might > > be > > > an > > > > > > unnecessary overhead. > > > > > > - Watermarks > > > > > > - Is holding back watermarks always good? If we have tons of > > data > > > > > > buffered/sorted and waiting to be processed > > > > > > with multiple windows per key and many different keys. > When > > we > > > > > > switch back to `isBacklog=false` we > > > > > > first process all of that data before processing > watermarks, > > > for > > > > > > operators that are not using sorted input the > > > > > > state size can explode significantly causing lots of > > problems. > > > > > Even > > > > > > for those that can use sorting, switching to > > > > > > sorting or BatchExecutionKeyedStateBackend is not always a > > > good > > > > > > idea, but keeping RocksDB also can be > > > > > > risky. > > > > > > > > > > > > > > > > With the current FLIP, the proposal is to use a sorter only when > the > > > > inputs > > > > > have keys. According to this practice, operators which are not > using > > > > > sorting should have un-keyed inputs. I believe such an operator > will > > > not > > > > > even use a keyed state backend. Maybe I missed some use-case. Can > you > > > > > provide a use-case where we will have an operator with un-keyed > > inputs > > > > > whose state size can explode due to we holding back watermarks? > > > > > > > > > > For operators with keyed inputs that use sorting, I suppose it is > > > > possible > > > > > that sorting + BatchExecutionKeyedStateBackend can be worse than > > using > > > > > RocksDB. But I believe this is very very rare (if possible) in > almost > > > > > practical usage of Flink. > > > > > > > > > > Take one step back, if this indeed cause regression for a real > > > use-case, > > > > > user can set execution.checkpointing.interval-during-backlog to > > > anything > > > > > other than 0 so that this FLIP will not use > > > > > sorter + BatchExecutionKeyedStateBackend even even when there is > > > backlog. > > > > > > > > > > I would hope we can find a way to automatically determine whether > > using > > > > > sorting + BatchExecutionKeyedStateBackend can be better or worse > than > > > > using > > > > > RocksDB alone. But I could not find a good and reliable way to do > > this. > > > > > Maybe we can update Flink to do this when we find a good way to do > > this > > > > in > > > > > the future? > > > > > > > > > > > > > > > > > > > > > - Keyed state backend > > > > > > - I think you haven't described what happens during switching > > > from > > > > > > streaming to backlog processing. > > > > > > > > > > > > > > > > Good point. This indeed needs to be described. I added a TODO in > the > > > > > "Behavior changes ..." section to describe what happens when > > isBacklog > > > > > switches from false to true, for all > > watermark/checkpoint/statebackend > > > > etc. > > > > > > > > > > Let me explain this for the state backend here for now. I will > update > > > > FLIP > > > > > later. > > > > > > > > > > When isBacklog switches from false to true, operator with keyed > > inputs > > > > can > > > > > optionally (as determined by its implementation) starts to use > > internal > > > > > sorter to sort inputs by key, without processing inputs or updating > > > > > statebackend, until it receives end-of-inputs or isBacklog is > > switched > > > to > > > > > false again. > > > > > > > > > > > > > > > > > > > > > - Switch can be an unnecessary overhead. > > > > > > > > > > > > > > > I agree it can cause unnecessary overhead, particularly when > > isBacklog > > > > > switches back and forth frequently. Whether or not this is > > unnecessary > > > > > likely depends on the duration/throughput of the backlog phase as > > well > > > as > > > > > the specific computation logic of the operator. I am not sure there > > is > > > a > > > > > good way for Flink to determine in advance whether switching is > > > > > unnecessary. > > > > > > > > > > Note that for the existing use-case where we expect to change > > isBacklog > > > > to > > > > > true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag > being > > > too > > > > > high), we don't expect the watermark to switch back and force > > > frequently. > > > > > And user can disable this switch by setting > > > > > execution.checkpointing.interval-during-backlog to anything other > > than > > > 0. > > > > > > > > > > Therefore, I am wondering if we can also view this as a performance > > > > > optimization opportunity for extra use-cases in the future, rather > > > than a > > > > > blocking issue of this FLIP for the MVP use-case (e.g. snapshot > phase > > > for > > > > > any CDC source, Kafka watermark lag). > > > > > > > > > > > > > > > > At the same time, in your current proposal, for > > > > > > `execution.checkpointing.interval-during-backlog > 0` we won't > > > > > > switch to "batch" mode at all. That's a bit of shame, I don't > > > > understand > > > > > > why those two things should be coupled > > > > > > together? > > > > > > > > > > > > > > > > We can in general classify optimizations as those that are > compatible > > > > with > > > > > checkpointing, and those that are not compatible with > checkpointing. > > > For > > > > > example, input sorting is currently not compatible with > > checkpointing. > > > > And > > > > > buffering input records to reduce state backend overhead (and > > probably > > > > > columnar processing for mini-batch in the future) is compatible > with > > > > > checkpointing. > > > > > > > > > > The primary of FLIP-327 is to support optimizations not compatible > > with > > > > > checkpointing. If execution.checkpointing.interval-during-backlog > > > 0, > > > > > which means that user intends to still do checkpointing even when > > there > > > > is > > > > > backog, then we will not be able to support such optimizations. > > > > > > > > > > For optimizations that are compatible with checkpointing, we can do > > > this > > > > > even when the operator does not run in "batch mode". There are > extra > > > > > problems to solve in order to achieve this optimization, such as > > > > supporting > > > > > unaligned checkpointing without prolonging its sync phase. I plan > to > > > > > explain how this can be done in FLIP-325. > > > > > > > > > > > > > > > > All in all, shouldn't we aim for some more clever process of > > > switching > > > > > back > > > > > > and forth between streaming/batch modes > > > > > > for watermark strategy/state backend/sorting based on some > metrics? > > > > > Trying > > > > > > to either predict if switching might help, > > > > > > or trying to estimate if the last switch was beneficial? Maybe > > > > something > > > > > > along the lines: > > > > > > - sort only in memory and during sorting count the number of > > distinct > > > > > keys > > > > > > (NDK) > > > > > > - maybe allow for spilling if so far in memory we have NDK * > 5 > > >= > > > > > > #records > > > > > > - do not allow to buffer records above a certain threshold, as > > > > otherwise > > > > > > checkpointing can explode > > > > > > - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >= > > > > #records > > > > > > - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records > > > > > > > > > > > > Or even maybe for starters something even simpler and then test > out > > > > > > something more fancy as a follow up? > > > > > > > > > > > > > > > > I agree it is worth investigating these ideas to further optimize > the > > > > > performance during backlog. > > > > > > > > > > I just think these can be done independently after this FLIP. The > > focus > > > > of > > > > > this FLIP is to re-use in stream mode the same optimization which > we > > > > > already use in batch mode, rather than inventing or improving the > > > > > performance of these existing optimizations. > > > > > > > > > > Given that there are already a lot of new mechanism/features to > > discuss > > > > and > > > > > address in this FLIP, I am hoping we can limit the scope of this > FLIP > > > to > > > > > re-use the existing optimization, and do these extra optimization > > > > > opportunities as future work. > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > At the same time, > > `execution.checkpointing.interval-during-backlog=0` > > > > > seems > > > > > > a weird setting to me, that I would > > > > > > not feel safe recommending to anyone. If processing of a backlog > > > takes > > > > a > > > > > > long time, a job might stop making > > > > > > any progress due to some random failures. Especially dangerous > if a > > > job > > > > > > > > > > switches from streaming mode back to > > > > > > backlog processing due to some reasons, as that could happen > months > > > > after > > > > > > someone started a job with this > > > > > > strange setting. So should we even have it? I would simply > disallow > > > > it. I > > > > > > > > > > > > > > > > Good point. I do agree we need to further work to improve the > > failover > > > > > performance in case any task fails. > > > > > > > > > > As of the current FLIP, if any task fails during backlog and > > > > > execution.checkpointing.interval-during-backlog = 0, we will need > to > > > > > restart all operators to the last checkpointed state and continue > > > > > processing backlog. And this can be a lot of rollback since there > is > > no > > > > > checkpoint during backlog. And this can also be worse than batch > > since > > > > this > > > > > FLIP currently does not support exporting/saving records to local > > disk > > > > (or > > > > > shuffle service) so that a failed task can re-consume the records > > from > > > > the > > > > > upstream task (or shuffle service) in the same way as how Flink > > > failover > > > > a > > > > > task in batch mode. > > > > > > > > > > I think we can extend this FLIP to solve this problem so that it > can > > > have > > > > > at least the same behavior/performance as batch-mode job. The idea > is > > > to > > > > > also follow what batch mode does. For example, we can trigger a > > > > checkpoint > > > > > when isBacklog switches to true, and every operator should buffer > its > > > > > output in the TM local disk (or remote shuffle service). Therefore, > > > > after a > > > > > task fails, it can restart from the last checkpoint and re-consume > > data > > > > > buffered in the upstream task. > > > > > > > > > > I will update FLIP as described above. Would this address your > > concern? > > > > > > > > > > > > > > > > > > > > > could see a power setting like: > > > > > > `execution.backlog.use-full-batch-mode-on-start (default > > > > false)` > > > > > > > > > > > > > > > > I am not sure I fully understand this config or its motivation. Can > > you > > > > > help explain the exact semantics of this config? > > > > > > > > > > > > > > > > that would override any heuristic of switching to backlog if > > someone > > > is > > > > > > submitting a new job that starts with > > > > > > `isBacklog=true`. > > > > > > > > > > > > Or we could limit the scope of this FLIP to only support starting > > > with > > > > > > batch mode and switching only once to > > > > > > streaming, and design a follow up with switching back and forth? > > > > > > > > > > > > > > > > Sure, that sounds good to me. I am happy to split this FLIP into > two > > > > FLIPs > > > > > so that we can make incremental progress. > > > > > > > > > > Best, > > > > > Dong > > > > > > > > > > > > > > > > I'm looking forwards to hearing/reading out your thoughts. > > > > > > > > > > > > Best, > > > > > > Piotrek > > > > > > > > > > > > > > > > > > śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid> > > > > > napisał(a): > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > Thanks for your reply! > > > > > > > > > > > > > > Best regards, > > > > > > > Jing > > > > > > > > > > > > > > On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Jing, > > > > > > > > > > > > > > > > Thanks for the comments. Please see my reply inline. > > > > > > > > > > > > > > > > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge > > > <j...@ververica.com.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > Thanks for the clarification. Now it is clear for me. I got > > > > > > additional > > > > > > > > noob > > > > > > > > > questions wrt the internal sorter. > > > > > > > > > > > > > > > > > > 1. when to call setter to set the internalSorterSupported > to > > be > > > > > true? > > > > > > > > > > > > > > > > > > > > > > > > > Developer of the operator class (i.e. those classes which > > > > implements > > > > > > > > `StreamOperator`) should override the > > `#getOperatorAttributes()` > > > > API > > > > > to > > > > > > > set > > > > > > > > internalSorterSupported to true, if he/she decides to sort > > > records > > > > > > > > internally in the operator. > > > > > > > > > > > > > > > > > > > > > > > > > 2 > > > > > > > > > *"For those operators whose throughput can be considerably > > > > improved > > > > > > > with > > > > > > > > an > > > > > > > > > internal sorter, update it to take advantage of the > internal > > > > sorter > > > > > > > when > > > > > > > > > its input has isBacklog=true.* > > > > > > > > > *Typically, operators that involve aggregation operation > > (e.g. > > > > > join, > > > > > > > > > cogroup, aggregate) on keyed inputs can benefit from using > an > > > > > > internal > > > > > > > > > sorter."* > > > > > > > > > > > > > > > > > > *"The operator that performs CoGroup operation will > > instantiate > > > > two > > > > > > > > > internal sorter to sorts records from its two inputs > > > separately. > > > > > Then > > > > > > > it > > > > > > > > > can pull the sorted records from these two sorters. This > can > > be > > > > > done > > > > > > > > > without wrapping input records with TaggedUnion<...>. In > > > > > comparison, > > > > > > > the > > > > > > > > > existing DataStream#coGroup needs to wrap input records > with > > > > > > > > > TaggedUnion<...> before sorting them using one external > > sorter, > > > > > which > > > > > > > > > introduces higher overhead."* > > > > > > > > > > > > > > > > > > According to the performance test, it seems that internal > > > sorter > > > > > has > > > > > > > > better > > > > > > > > > performance than external sorter. Is it possible to make > > those > > > > > > > operators > > > > > > > > > that can benefit from it use internal sorter by default? > > > > > > > > > > > > > > > > > > > > > > > > > Yes, it is possible. After this FLIP is done, users can use > > > > > > > > DataStream#coGroup with EndOfStreamWindows as the window > > assigner > > > > to > > > > > > > > co-group two streams in effectively the batch manner. An > > operator > > > > > that > > > > > > > uses > > > > > > > > an internal sorter will be used to perform the co-group > > > operation. > > > > > > There > > > > > > > is > > > > > > > > no need for users of the DataStream API to explicitly know or > > set > > > > the > > > > > > > > internal sorter in anyway. > > > > > > > > > > > > > > > > In the future, we plan to incrementally optimize other > > > aggregation > > > > > > > > operation (e.g. aggregate) on the DataStream API when > > > > > > EndOfStreamWindows > > > > > > > is > > > > > > > > used as the window assigner. > > > > > > > > > > > > > > > > Best, > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > Jing > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin < > > lindon...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Jing, > > > > > > > > > > > > > > > > > > > > Thank you for the comments! Please see my reply inline. > > > > > > > > > > > > > > > > > > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge > > > > > <j...@ververica.com.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > > > > > Thanks for the proposal! The FLIP is already in good > > > shape. I > > > > > got > > > > > > > > some > > > > > > > > > > NIT > > > > > > > > > > > questions. > > > > > > > > > > > > > > > > > > > > > > 1. It is a little bit weird to write the hint right > after > > > the > > > > > > > > > motivation > > > > > > > > > > > that some features have been moved to FLIP-331, because > > at > > > > that > > > > > > > time, > > > > > > > > > > > readers don't know the context about what features does > > it > > > > > mean. > > > > > > I > > > > > > > > > would > > > > > > > > > > > suggest moving the note to the beginning of "Public > > > > interfaces" > > > > > > > > > sections. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Given that the reviewer who commented on this email > thread > > > > > before I > > > > > > > > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I > think > > it > > > > is > > > > > > > > simpler > > > > > > > > > to > > > > > > > > > > just remove any mention of FLIP-331. I have updated the > > FLIP > > > > > > > > accordingly. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. It is also a little bit weird to describe all > > behaviour > > > > > > changes > > > > > > > at > > > > > > > > > > first > > > > > > > > > > > but only focus on one single feature, i.e. how to > > implement > > > > > > > > > > > internalSorterSupported. TBH, I was lost while I was > > > reading > > > > > the > > > > > > > > Public > > > > > > > > > > > interfaces. Maybe change the FLIP title? Another option > > > could > > > > > be > > > > > > to > > > > > > > > > > write a > > > > > > > > > > > short summary of all features and point out that this > > FLIP > > > > will > > > > > > > only > > > > > > > > > > focus > > > > > > > > > > > on the internalSorterSupported feature. Others could be > > > found > > > > > in > > > > > > > > > > FLIP-331. > > > > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Conceptually, the purpose of this FLIP is to allow a > stream > > > > mode > > > > > > job > > > > > > > to > > > > > > > > > run > > > > > > > > > > parts of the topology in batch mode so that it can apply > > > > > > > > > > optimizations/computations that can not be used together > > with > > > > > > > > > checkpointing > > > > > > > > > > (and thus not usable in stream mode). Although internal > > > sorter > > > > is > > > > > > the > > > > > > > > > only > > > > > > > > > > optimization immediately supported in this FLIP, this > FLIP > > > lays > > > > > the > > > > > > > > > > foundation to support other optimizations in the future, > > such > > > > as > > > > > > > using > > > > > > > > > GPU > > > > > > > > > > to process a bounded stream of records. > > > > > > > > > > > > > > > > > > > > Therefore, I find it better to keep the current title > > rather > > > > than > > > > > > > > > limiting > > > > > > > > > > the scope to internal sorter. What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. There should be a typo at 4) Checkpoint and failover > > > > > strategy > > > > > > -> > > > > > > > > > Mixed > > > > > > > > > > > mode -> > > > > > > > > > > > > > > > > > > > > > > - If any task fails when isBacklog=false true, this > > task > > > > is > > > > > > > > > restarted > > > > > > > > > > to > > > > > > > > > > > re-process its input from the beginning. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for catching this issue. It is fixed now. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards > > > > > > > > > > > Jing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your comments! Please see my reply inline. > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski < > > > > > > > > > > piotr.nowoj...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > > > > > > > > > I have a couple of questions. > > > > > > > > > > > > > > > > > > > > > > > > > > Could you explain why those properties > > > > > > > > > > > > > > > > > > > > > > > > > > @Nullable private Boolean isOutputOnEOF = null; > > > > > > > > > > > > > @Nullable private Boolean isOutputOnCheckpoint > = > > > > null; > > > > > > > > > > > > > @Nullable private Boolean > > > isInternalSorterSupported = > > > > > > null; > > > > > > > > > > > > > > > > > > > > > > > > > > must be `@Nullable`, instead of having the default > > > value > > > > > set > > > > > > to > > > > > > > > > > > `false`? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > By initializing these private variables in > > > > > > > > OperatorAttributesBuilder > > > > > > > > > as > > > > > > > > > > > > null, we can implement > > > `OperatorAttributesBuilder#build()` > > > > in > > > > > > > such > > > > > > > > a > > > > > > > > > > way > > > > > > > > > > > > that it can print DEBUG level logging to say > > > > > > > "isOutputOnCheckpoint > > > > > > > > is > > > > > > > > > > not > > > > > > > > > > > > explicitly set". This can help user/SRE debug > > performance > > > > > > issues > > > > > > > > (or > > > > > > > > > > lack > > > > > > > > > > > > of the expected optimization) due to operators not > > > > explicitly > > > > > > > > setting > > > > > > > > > > the > > > > > > > > > > > > right operator attribute. > > > > > > > > > > > > > > > > > > > > > > > > For example, we might want a job to always use the > > longer > > > > > > > > > checkpointing > > > > > > > > > > > > interval (i.e. > > > > > execution.checkpointing.interval-during-backlog) > > > > > > > if > > > > > > > > > all > > > > > > > > > > > > running operators have isOutputOnCheckpoint==false, > and > > > use > > > > > the > > > > > > > > short > > > > > > > > > > > > checkpointing interval otherwise. If a user has > > > explicitly > > > > > > > > configured > > > > > > > > > > the > > > > > > > > > > > > execution.checkpointing.interval-during-backlog but > the > > > > > > two-phase > > > > > > > > > > commit > > > > > > > > > > > > sink library has not been upgraded to set > > > > > > > > isOutputOnCheckpoint=true, > > > > > > > > > > then > > > > > > > > > > > > the job will end up using the long checkpointing > > > interval, > > > > > and > > > > > > it > > > > > > > > > will > > > > > > > > > > be > > > > > > > > > > > > useful to figure out what is going wrong in this case > > by > > > > > > checking > > > > > > > > the > > > > > > > > > > > log. > > > > > > > > > > > > > > > > > > > > > > > > Note that the default value of these fields of the > > > > > > > > OperatorAttributes > > > > > > > > > > > > instance built by OperatorAttributesBuilder will > still > > be > > > > > > false. > > > > > > > > The > > > > > > > > > > > > following is mentioned in the Java doc of > > > > > > > > > > > > `OperatorAttributesBuilder#build()`: > > > > > > > > > > > > > > > > > > > > > > > > /** > > > > > > > > > > > > * If any operator attribute is null, we will log it > > at > > > > > DEBUG > > > > > > > > level > > > > > > > > > > and > > > > > > > > > > > > use the following > > > > > > > > > > > > * default values. > > > > > > > > > > > > * - isOutputOnEOF defaults to false > > > > > > > > > > > > * - isOutputOnCheckpoint defaults to false > > > > > > > > > > > > * - isInternalSorterSupported defaults to false > > > > > > > > > > > > */ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Second question, have you thought about cases where > > > > someone > > > > > > is > > > > > > > > > > > > > either bootstrapping from a streaming source like > > Kafka > > > > > > > > > > > > > or simply trying to catch up after a long period of > > > > > downtime > > > > > > > in a > > > > > > > > > > > purely > > > > > > > > > > > > > streaming job? Generally speaking a cases where > > > > > > > > > > > > > user doesn't care about latency in the catch up > > phase, > > > > > > > regardless > > > > > > > > > if > > > > > > > > > > > the > > > > > > > > > > > > > source is bounded or unbounded, but wants to > process > > > > > > > > > > > > > the data as fast as possible, and then switch > > > dynamically > > > > > to > > > > > > > real > > > > > > > > > > time > > > > > > > > > > > > > processing? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I have thought about this. We should allow this > > job > > > to > > > > > > > > > effectively > > > > > > > > > > > run > > > > > > > > > > > > in batch mode when the job is in the catch-up phase. > > > > FLIP-327 > > > > > > is > > > > > > > > > > actually > > > > > > > > > > > > an important step toward addressing this use-case. > > > > > > > > > > > > > > > > > > > > > > > > In order to address the above use-case, all we need > is > > a > > > > way > > > > > > for > > > > > > > > > source > > > > > > > > > > > > operator (e.g. Kafka) to tell Flink runtime (via > > > > > > > > IsProcessingBacklog) > > > > > > > > > > > > whether it is in the catch-up phase. > > > > > > > > > > > > > > > > > > > > > > > > Since every Kafka message has event-timestamp, we can > > > allow > > > > > > users > > > > > > > > to > > > > > > > > > > > > specify a job-level config such as > > > > > > > backlog-watermark-lag-threshold, > > > > > > > > > and > > > > > > > > > > > > consider a Kafka Source to have > > IsProcessingBacklog=true > > > if > > > > > > > > > > system_time - > > > > > > > > > > > > watermark > backlog-watermark-lag-threshold. This > > > > effectively > > > > > > > > allows > > > > > > > > > us > > > > > > > > > > > to > > > > > > > > > > > > determine whether Kafka is in the catch up phase. > > > > > > > > > > > > > > > > > > > > > > > > Once we have this capability (I plan to work on this > in > > > > > > > FLIP-328), > > > > > > > > we > > > > > > > > > > can > > > > > > > > > > > > directly use the features proposed in FLIP-325 and > > > FLIP-327 > > > > > to > > > > > > > > > optimize > > > > > > > > > > > the > > > > > > > > > > > > above use-case. > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > > > > > > napisał(a): > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am opening this thread to discuss FLIP-327: > > Support > > > > > > > > > stream-batch > > > > > > > > > > > > > unified > > > > > > > > > > > > > > operator to improve job throughput when > processing > > > > > backlog > > > > > > > > data. > > > > > > > > > > The > > > > > > > > > > > > > design > > > > > > > > > > > > > > doc can be found at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data > > > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > > > > > > > This FLIP enables a Flink job to initially > operate > > in > > > > > batch > > > > > > > > mode, > > > > > > > > > > > > > achieving > > > > > > > > > > > > > > high throughput while processing records that do > > not > > > > > > require > > > > > > > > low > > > > > > > > > > > > > processing > > > > > > > > > > > > > > latency. Subsequently, the job can seamlessly > > > > transition > > > > > to > > > > > > > > > stream > > > > > > > > > > > mode > > > > > > > > > > > > > for > > > > > > > > > > > > > > processing real-time records with low latency. > > > > > Importantly, > > > > > > > the > > > > > > > > > > same > > > > > > > > > > > > > state > > > > > > > > > > > > > > can be utilized before and after this mode > switch, > > > > making > > > > > > it > > > > > > > > > > > > particularly > > > > > > > > > > > > > > valuable when users wish to bootstrap the job's > > state > > > > > using > > > > > > > > > > > historical > > > > > > > > > > > > > > data. > > > > > > > > > > > > > > > > > > > > > > > > > > > > We would greatly appreciate any comments or > > feedback > > > > you > > > > > > may > > > > > > > > have > > > > > > > > > > on > > > > > > > > > > > > this > > > > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >