Hi, Sorry for the long delay in responding!
> Given that it is an optional feature that can be > turned off by users, it might be OK to just let users try it out and we can > fix performance issues once we detect any of them. What do you think? I think it's fine. It would be best to mark this feature as experimental, and we say that the config keys or the default values might change in the future. > Maybe we can revisit the need for such a config when we introduce/discuss > the capability to switch backlog from false to true in the future. What do > you think? Sure, we can do that. Best, Piotrek niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a): > Hi Piotr, > > Thanks a lot for the explanation. Please see my reply inline. > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > > > Hi Dong, > > > > Thanks a lot for the answers. I can now only briefly answer your last > > email. > > > > > It is possible that spilling to disks might cause larger overhead. IMO > it > > > is an orthogonal issue already existing in Flink. This is because a > Flink > > > job running batch mode might also be slower than its throughput in > stream > > > mode due to the same reason. > > > > Yes, I know, but the thing that worries me is that previously only a user > > alone > > could decide whether to use batch mode or streaming, and in practice one > > user would rarely (if ever) use both for the same problem/job/query. If > his > > intention was to eventually process live data, he was using streaming > even > > if there was a large backlog at the start (apart of some very few very > > power > > users). > > > > With this change, we want to introduce a mode that would be switching > back > > and forth between streaming and "batch in streaming" automatically. So a > > potential performance regression would be much more visible and painful > > at the same time. If batch query runs slower then it could, it's kind of > > fine as > > it will end at some point. If streaming query during large back pressure > > maybe > > temporary load spike switches to batch processing, that's a bigger deal. > > Especially if batch processing mode will not be able to actually even > > handle > > the normal load, after the load spike. In that case, the job could never > > recover > > from the backpressure/backlog mode. > > > > I understand you are concerned with the risk of performance regression > introduced due to switching to batch mode. > > After thinking about this more, I think this existing proposal meets the > minimum requirement of "not introducing regression for existing jobs". The > reason is that even if batch mode can be slower than stream mode for some > operators in some cases, this is an optional feature that will only be > enabled if a user explicitly overrides the newly introduced config to > non-default values. Existing jobs that simply upgrade their Flink library > version will not suffer any performance regression. > > More specifically, in order to switch to batch mode, users will need to > explicitly set execution.checkpointing.interval-during-backlog to 0. And > users can always explicitly update > execution.checkpointing.interval-during-backlog to turn off the batch mode > if that incurs any performance issue. > > As far as I can tell, for all practical workloads we see in production > jobs, batch mode is always faster (w.r.t. throughput) than stream mode when > there is a high backlog of incoming records. Though it is still > theoretically possible, it should be very rare (if any) for batch mode to > be slower in practice. Given that it is an optional feature that can be > turned off by users, it might be OK to just let users try it out and we can > fix performance issues once we detect any of them. What do you think? > > > > > > > execution.backlog.use-full-batch-mode-on-start (default false) > > > > ops sorry, it was supposed to be sth like: > > > > execution.backlog.use-batch-mode-only-on-start (default false) > > > > That option would disallow switching from streaming to batch. Batch mode > > would be allowed only to get rid of the initial, present on start-up > > backlog. > > > > Would allow us to safely experiment with switching from streaming to > batch > > and I would be actually more fine in enabling "using batch mode on start" > > by default, until we gain confidence and feedback that switching back & > > forth > > is working as expected. > > > > Now I understand what you are suggesting. I agree that it is necessary for > users to be able to disallow switching from streaming to batch. > > I am not sure it is necessary to introduce an extra config just for this > purpose. The reason is that we don't have any strategy that switches > backlog status from false to true yet. And when we have such strategy (e.g. > FLIP-328) in the future, it is very likely that we will introduce extra > config(s) for users to explicitly turn on such a feature. That means user > should be able to turn off this feature even if we don't have something > like execution.backlog.use-batch-mode-only-on-start. > > Maybe we can revisit the need for such a config when we introduce/discuss > the capability to switch backlog from false to true in the future. What do > you think? > > > > > > >> Or we could limit the scope of this FLIP to only support starting with > > >> batch mode and switching only once to > > >> streaming, and design a follow up with switching back and forth? > > > > > > Sure, that sounds good to me. I am happy to split this FLIP into two > > FLIPs > > > so that we can make incremental progress. > > > > Great, let's do that. In a follow up FLIP we could restart the discussion > > about > > switching back and forth. > > > > Cool, I added the following statement to the motivation section. > > "NOTE: this FLIP focuses only on the capability to switch from batch to > stream mode. If there is any extra API needed to support switching from > stream to batch mode, we will discuss them in a follow-up FLIP." > > I am looking forward to reading your follow-up thoughts! > > Best, > Dong > > > > Piotrek > > > > czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> napisał(a): > > > > > Hi Piotr, > > > > > > Thank you for the very detailed comments! Please see my reply inline. > > > > > > On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski < > > piotr.nowoj...@gmail.com> > > > wrote: > > > > > > > Hi Dong, > > > > > > > > I have a couple of follow up questions about switching back and forth > > > > between streaming and batching mode. > > > > Especially around shuffle/watermark strategy, and keyed state > backend. > > > > > > > > First of all, it might not always be beneficial to switch into the > > batch > > > > modes: > > > > - Shuffle strategy > > > > - Is sorting going to be purely in-memory? If not, obviously > > spilling > > > > to disks might cause larger overheads > > > > compared to not sorting the records. > > > > > > > > > > Sorting might require spilling data to disk depending on the input > size. > > > The behavior of sorting w.r.t. memory/disk is expected to be exactly > the > > > same as the behavior of input sorting automatically performed by Flink > > > runtime in batch mode for keyed inputs. > > > > > > More specifically, ExternalSorter > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java > > > > > > > is > > > currently used to sort keyed inputs in batch mode. It is automatically > > used > > > by Flink runtime in OneInputStreamTask (here > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114 > > > >) > > > and in MultiInputSortingDataInput (here > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188 > > > >). > > > We plan to re-use the same code/mechanism to do sorting. > > > > > > It is possible that spilling to disks might cause larger overhead. IMO > it > > > is an orthogonal issue already existing in Flink. This is because a > Flink > > > job running batch mode might also be slower than its throughput in > stream > > > mode due to the same reason. However, even though it is possible in > > theory, > > > I expect that in practice the throughput of using sorting + > > > BatchExecutionKeyedStateBackend should be much higher than using other > > > keyed statebackends when the amount of data is large. As a matter of > > fact, > > > we have not heard of complaints of such performance regression issues > in > > > batch mode. > > > > > > The primary goal of this FLIP is to allow the operator to run at the > same > > > throughput (in stream mode when there is backlog) as it can currently > do > > in > > > batch mode. And this goal is not affected by the disk overhead issue > > > mentioned above. > > > > > > I am thinking maybe we can treat it as an orthogonal performance > > > optimization problem instead of solving this problem in this FLIP? > > > > > > - If it will be at least partially in-memory, does Flink have some > > > > mechanism to reserve optional memory that > > > > can be revoked if a new operator starts up? Can this memory be > > > > redistributed? Ideally we should use as > > > > much as possible of the available memory to avoid spilling > costs, > > > but > > > > also being able to revoke that memory > > > > > > > > > > This FLIP does not support dynamically revoking/redistribuitng managed > > > memory used by the ExternalSorter. > > > > > > For operators with isInternalSorterSupported = true, we will allocate > to > > > this operator execution.sorted-inputs.memory > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144 > > > > > > > amount of managed memory. This is the same as how Flink allocates > managed > > > memory to an operator when this operator has keyed inputs in batch > mode. > > > > > > Note that this FLIP intends to support operators to sort inputs > whenever > > > there is backlog. And there is currently no way for an operator to know > > in > > > advance whether there will be no backlog after a given time. So it > seems > > > simpler to just keep managed memory for such an operator throughout the > > > lifecycle of this operator, for now. > > > > > > Besides, it seems that the lack of ability to dynamically > > > revoke/redistribute un-used managed memory is an existing issue in > Flink. > > > For example, we might have two operators sharing the same slot and > these > > > two operators both use managed memory (e.g. to sort inputs). There is > > > currently no way for one operator to re-use the memory not used by the > > > other operator. > > > > > > Therefore, I think we can treat this as an orthogonal performance > > > optimization problem which can be addressed separately. What do you > > think? > > > > > > > > > > - Sometimes sorting, even if we have memory to do that, might be > an > > > > unnecessary overhead. > > > > - Watermarks > > > > - Is holding back watermarks always good? If we have tons of data > > > > buffered/sorted and waiting to be processed > > > > with multiple windows per key and many different keys. When we > > > > switch back to `isBacklog=false` we > > > > first process all of that data before processing watermarks, > for > > > > operators that are not using sorted input the > > > > state size can explode significantly causing lots of problems. > > > Even > > > > for those that can use sorting, switching to > > > > sorting or BatchExecutionKeyedStateBackend is not always a > good > > > > idea, but keeping RocksDB also can be > > > > risky. > > > > > > > > > > With the current FLIP, the proposal is to use a sorter only when the > > inputs > > > have keys. According to this practice, operators which are not using > > > sorting should have un-keyed inputs. I believe such an operator will > not > > > even use a keyed state backend. Maybe I missed some use-case. Can you > > > provide a use-case where we will have an operator with un-keyed inputs > > > whose state size can explode due to we holding back watermarks? > > > > > > For operators with keyed inputs that use sorting, I suppose it is > > possible > > > that sorting + BatchExecutionKeyedStateBackend can be worse than using > > > RocksDB. But I believe this is very very rare (if possible) in almost > > > practical usage of Flink. > > > > > > Take one step back, if this indeed cause regression for a real > use-case, > > > user can set execution.checkpointing.interval-during-backlog to > anything > > > other than 0 so that this FLIP will not use > > > sorter + BatchExecutionKeyedStateBackend even even when there is > backlog. > > > > > > I would hope we can find a way to automatically determine whether using > > > sorting + BatchExecutionKeyedStateBackend can be better or worse than > > using > > > RocksDB alone. But I could not find a good and reliable way to do this. > > > Maybe we can update Flink to do this when we find a good way to do this > > in > > > the future? > > > > > > > > > > > > > - Keyed state backend > > > > - I think you haven't described what happens during switching > from > > > > streaming to backlog processing. > > > > > > > > > > Good point. This indeed needs to be described. I added a TODO in the > > > "Behavior changes ..." section to describe what happens when isBacklog > > > switches from false to true, for all watermark/checkpoint/statebackend > > etc. > > > > > > Let me explain this for the state backend here for now. I will update > > FLIP > > > later. > > > > > > When isBacklog switches from false to true, operator with keyed inputs > > can > > > optionally (as determined by its implementation) starts to use internal > > > sorter to sort inputs by key, without processing inputs or updating > > > statebackend, until it receives end-of-inputs or isBacklog is switched > to > > > false again. > > > > > > > > > > > > > - Switch can be an unnecessary overhead. > > > > > > > > > I agree it can cause unnecessary overhead, particularly when isBacklog > > > switches back and forth frequently. Whether or not this is unnecessary > > > likely depends on the duration/throughput of the backlog phase as well > as > > > the specific computation logic of the operator. I am not sure there is > a > > > good way for Flink to determine in advance whether switching is > > > unnecessary. > > > > > > Note that for the existing use-case where we expect to change isBacklog > > to > > > true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag being > too > > > high), we don't expect the watermark to switch back and force > frequently. > > > And user can disable this switch by setting > > > execution.checkpointing.interval-during-backlog to anything other than > 0. > > > > > > Therefore, I am wondering if we can also view this as a performance > > > optimization opportunity for extra use-cases in the future, rather > than a > > > blocking issue of this FLIP for the MVP use-case (e.g. snapshot phase > for > > > any CDC source, Kafka watermark lag). > > > > > > > > > > At the same time, in your current proposal, for > > > > `execution.checkpointing.interval-during-backlog > 0` we won't > > > > switch to "batch" mode at all. That's a bit of shame, I don't > > understand > > > > why those two things should be coupled > > > > together? > > > > > > > > > > We can in general classify optimizations as those that are compatible > > with > > > checkpointing, and those that are not compatible with checkpointing. > For > > > example, input sorting is currently not compatible with checkpointing. > > And > > > buffering input records to reduce state backend overhead (and probably > > > columnar processing for mini-batch in the future) is compatible with > > > checkpointing. > > > > > > The primary of FLIP-327 is to support optimizations not compatible with > > > checkpointing. If execution.checkpointing.interval-during-backlog > 0, > > > which means that user intends to still do checkpointing even when there > > is > > > backog, then we will not be able to support such optimizations. > > > > > > For optimizations that are compatible with checkpointing, we can do > this > > > even when the operator does not run in "batch mode". There are extra > > > problems to solve in order to achieve this optimization, such as > > supporting > > > unaligned checkpointing without prolonging its sync phase. I plan to > > > explain how this can be done in FLIP-325. > > > > > > > > > > All in all, shouldn't we aim for some more clever process of > switching > > > back > > > > and forth between streaming/batch modes > > > > for watermark strategy/state backend/sorting based on some metrics? > > > Trying > > > > to either predict if switching might help, > > > > or trying to estimate if the last switch was beneficial? Maybe > > something > > > > along the lines: > > > > - sort only in memory and during sorting count the number of distinct > > > keys > > > > (NDK) > > > > - maybe allow for spilling if so far in memory we have NDK * 5 >= > > > > #records > > > > - do not allow to buffer records above a certain threshold, as > > otherwise > > > > checkpointing can explode > > > > - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >= > > #records > > > > - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records > > > > > > > > Or even maybe for starters something even simpler and then test out > > > > something more fancy as a follow up? > > > > > > > > > > I agree it is worth investigating these ideas to further optimize the > > > performance during backlog. > > > > > > I just think these can be done independently after this FLIP. The focus > > of > > > this FLIP is to re-use in stream mode the same optimization which we > > > already use in batch mode, rather than inventing or improving the > > > performance of these existing optimizations. > > > > > > Given that there are already a lot of new mechanism/features to discuss > > and > > > address in this FLIP, I am hoping we can limit the scope of this FLIP > to > > > re-use the existing optimization, and do these extra optimization > > > opportunities as future work. > > > > > > What do you think? > > > > > > > > > > > > > > At the same time, `execution.checkpointing.interval-during-backlog=0` > > > seems > > > > a weird setting to me, that I would > > > > not feel safe recommending to anyone. If processing of a backlog > takes > > a > > > > long time, a job might stop making > > > > any progress due to some random failures. Especially dangerous if a > job > > > > > > switches from streaming mode back to > > > > backlog processing due to some reasons, as that could happen months > > after > > > > someone started a job with this > > > > strange setting. So should we even have it? I would simply disallow > > it. I > > > > > > > > > > Good point. I do agree we need to further work to improve the failover > > > performance in case any task fails. > > > > > > As of the current FLIP, if any task fails during backlog and > > > execution.checkpointing.interval-during-backlog = 0, we will need to > > > restart all operators to the last checkpointed state and continue > > > processing backlog. And this can be a lot of rollback since there is no > > > checkpoint during backlog. And this can also be worse than batch since > > this > > > FLIP currently does not support exporting/saving records to local disk > > (or > > > shuffle service) so that a failed task can re-consume the records from > > the > > > upstream task (or shuffle service) in the same way as how Flink > failover > > a > > > task in batch mode. > > > > > > I think we can extend this FLIP to solve this problem so that it can > have > > > at least the same behavior/performance as batch-mode job. The idea is > to > > > also follow what batch mode does. For example, we can trigger a > > checkpoint > > > when isBacklog switches to true, and every operator should buffer its > > > output in the TM local disk (or remote shuffle service). Therefore, > > after a > > > task fails, it can restart from the last checkpoint and re-consume data > > > buffered in the upstream task. > > > > > > I will update FLIP as described above. Would this address your concern? > > > > > > > > > > > > > could see a power setting like: > > > > `execution.backlog.use-full-batch-mode-on-start (default > > false)` > > > > > > > > > > I am not sure I fully understand this config or its motivation. Can you > > > help explain the exact semantics of this config? > > > > > > > > > > that would override any heuristic of switching to backlog if someone > is > > > > submitting a new job that starts with > > > > `isBacklog=true`. > > > > > > > > Or we could limit the scope of this FLIP to only support starting > with > > > > batch mode and switching only once to > > > > streaming, and design a follow up with switching back and forth? > > > > > > > > > > Sure, that sounds good to me. I am happy to split this FLIP into two > > FLIPs > > > so that we can make incremental progress. > > > > > > Best, > > > Dong > > > > > > > > > > I'm looking forwards to hearing/reading out your thoughts. > > > > > > > > Best, > > > > Piotrek > > > > > > > > > > > > śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid> > > > napisał(a): > > > > > > > > > Hi Dong, > > > > > > > > > > Thanks for your reply! > > > > > > > > > > Best regards, > > > > > Jing > > > > > > > > > > On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > Hi Jing, > > > > > > > > > > > > Thanks for the comments. Please see my reply inline. > > > > > > > > > > > > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge > <j...@ververica.com.invalid > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > Thanks for the clarification. Now it is clear for me. I got > > > > additional > > > > > > noob > > > > > > > questions wrt the internal sorter. > > > > > > > > > > > > > > 1. when to call setter to set the internalSorterSupported to be > > > true? > > > > > > > > > > > > > > > > > > > Developer of the operator class (i.e. those classes which > > implements > > > > > > `StreamOperator`) should override the `#getOperatorAttributes()` > > API > > > to > > > > > set > > > > > > internalSorterSupported to true, if he/she decides to sort > records > > > > > > internally in the operator. > > > > > > > > > > > > > > > > > > > 2 > > > > > > > *"For those operators whose throughput can be considerably > > improved > > > > > with > > > > > > an > > > > > > > internal sorter, update it to take advantage of the internal > > sorter > > > > > when > > > > > > > its input has isBacklog=true.* > > > > > > > *Typically, operators that involve aggregation operation (e.g. > > > join, > > > > > > > cogroup, aggregate) on keyed inputs can benefit from using an > > > > internal > > > > > > > sorter."* > > > > > > > > > > > > > > *"The operator that performs CoGroup operation will instantiate > > two > > > > > > > internal sorter to sorts records from its two inputs > separately. > > > Then > > > > > it > > > > > > > can pull the sorted records from these two sorters. This can be > > > done > > > > > > > without wrapping input records with TaggedUnion<...>. In > > > comparison, > > > > > the > > > > > > > existing DataStream#coGroup needs to wrap input records with > > > > > > > TaggedUnion<...> before sorting them using one external sorter, > > > which > > > > > > > introduces higher overhead."* > > > > > > > > > > > > > > According to the performance test, it seems that internal > sorter > > > has > > > > > > better > > > > > > > performance than external sorter. Is it possible to make those > > > > > operators > > > > > > > that can benefit from it use internal sorter by default? > > > > > > > > > > > > > > > > > > > Yes, it is possible. After this FLIP is done, users can use > > > > > > DataStream#coGroup with EndOfStreamWindows as the window assigner > > to > > > > > > co-group two streams in effectively the batch manner. An operator > > > that > > > > > uses > > > > > > an internal sorter will be used to perform the co-group > operation. > > > > There > > > > > is > > > > > > no need for users of the DataStream API to explicitly know or set > > the > > > > > > internal sorter in anyway. > > > > > > > > > > > > In the future, we plan to incrementally optimize other > aggregation > > > > > > operation (e.g. aggregate) on the DataStream API when > > > > EndOfStreamWindows > > > > > is > > > > > > used as the window assigner. > > > > > > > > > > > > Best, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > Jing > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Jing, > > > > > > > > > > > > > > > > Thank you for the comments! Please see my reply inline. > > > > > > > > > > > > > > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge > > > <j...@ververica.com.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > Thanks for the proposal! The FLIP is already in good > shape. I > > > got > > > > > > some > > > > > > > > NIT > > > > > > > > > questions. > > > > > > > > > > > > > > > > > > 1. It is a little bit weird to write the hint right after > the > > > > > > > motivation > > > > > > > > > that some features have been moved to FLIP-331, because at > > that > > > > > time, > > > > > > > > > readers don't know the context about what features does it > > > mean. > > > > I > > > > > > > would > > > > > > > > > suggest moving the note to the beginning of "Public > > interfaces" > > > > > > > sections. > > > > > > > > > > > > > > > > > > > > > > > > > Given that the reviewer who commented on this email thread > > > before I > > > > > > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I think it > > is > > > > > > simpler > > > > > > > to > > > > > > > > just remove any mention of FLIP-331. I have updated the FLIP > > > > > > accordingly. > > > > > > > > > > > > > > > > > > > > > > > > > 2. It is also a little bit weird to describe all behaviour > > > > changes > > > > > at > > > > > > > > first > > > > > > > > > but only focus on one single feature, i.e. how to implement > > > > > > > > > internalSorterSupported. TBH, I was lost while I was > reading > > > the > > > > > > Public > > > > > > > > > interfaces. Maybe change the FLIP title? Another option > could > > > be > > > > to > > > > > > > > write a > > > > > > > > > short summary of all features and point out that this FLIP > > will > > > > > only > > > > > > > > focus > > > > > > > > > on the internalSorterSupported feature. Others could be > found > > > in > > > > > > > > FLIP-331. > > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > Conceptually, the purpose of this FLIP is to allow a stream > > mode > > > > job > > > > > to > > > > > > > run > > > > > > > > parts of the topology in batch mode so that it can apply > > > > > > > > optimizations/computations that can not be used together with > > > > > > > checkpointing > > > > > > > > (and thus not usable in stream mode). Although internal > sorter > > is > > > > the > > > > > > > only > > > > > > > > optimization immediately supported in this FLIP, this FLIP > lays > > > the > > > > > > > > foundation to support other optimizations in the future, such > > as > > > > > using > > > > > > > GPU > > > > > > > > to process a bounded stream of records. > > > > > > > > > > > > > > > > Therefore, I find it better to keep the current title rather > > than > > > > > > > limiting > > > > > > > > the scope to internal sorter. What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. There should be a typo at 4) Checkpoint and failover > > > strategy > > > > -> > > > > > > > Mixed > > > > > > > > > mode -> > > > > > > > > > > > > > > > > > > - If any task fails when isBacklog=false true, this task > > is > > > > > > > restarted > > > > > > > > to > > > > > > > > > re-process its input from the beginning. > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for catching this issue. It is fixed now. > > > > > > > > > > > > > > > > Best, > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards > > > > > > > > > Jing > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin < > lindon...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > > > Thanks for your comments! Please see my reply inline. > > > > > > > > > > > > > > > > > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski < > > > > > > > > piotr.nowoj...@gmail.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > > > > > I have a couple of questions. > > > > > > > > > > > > > > > > > > > > > > Could you explain why those properties > > > > > > > > > > > > > > > > > > > > > > @Nullable private Boolean isOutputOnEOF = null; > > > > > > > > > > > @Nullable private Boolean isOutputOnCheckpoint = > > null; > > > > > > > > > > > @Nullable private Boolean > isInternalSorterSupported = > > > > null; > > > > > > > > > > > > > > > > > > > > > > must be `@Nullable`, instead of having the default > value > > > set > > > > to > > > > > > > > > `false`? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > By initializing these private variables in > > > > > > OperatorAttributesBuilder > > > > > > > as > > > > > > > > > > null, we can implement > `OperatorAttributesBuilder#build()` > > in > > > > > such > > > > > > a > > > > > > > > way > > > > > > > > > > that it can print DEBUG level logging to say > > > > > "isOutputOnCheckpoint > > > > > > is > > > > > > > > not > > > > > > > > > > explicitly set". This can help user/SRE debug performance > > > > issues > > > > > > (or > > > > > > > > lack > > > > > > > > > > of the expected optimization) due to operators not > > explicitly > > > > > > setting > > > > > > > > the > > > > > > > > > > right operator attribute. > > > > > > > > > > > > > > > > > > > > For example, we might want a job to always use the longer > > > > > > > checkpointing > > > > > > > > > > interval (i.e. > > > execution.checkpointing.interval-during-backlog) > > > > > if > > > > > > > all > > > > > > > > > > running operators have isOutputOnCheckpoint==false, and > use > > > the > > > > > > short > > > > > > > > > > checkpointing interval otherwise. If a user has > explicitly > > > > > > configured > > > > > > > > the > > > > > > > > > > execution.checkpointing.interval-during-backlog but the > > > > two-phase > > > > > > > > commit > > > > > > > > > > sink library has not been upgraded to set > > > > > > isOutputOnCheckpoint=true, > > > > > > > > then > > > > > > > > > > the job will end up using the long checkpointing > interval, > > > and > > > > it > > > > > > > will > > > > > > > > be > > > > > > > > > > useful to figure out what is going wrong in this case by > > > > checking > > > > > > the > > > > > > > > > log. > > > > > > > > > > > > > > > > > > > > Note that the default value of these fields of the > > > > > > OperatorAttributes > > > > > > > > > > instance built by OperatorAttributesBuilder will still be > > > > false. > > > > > > The > > > > > > > > > > following is mentioned in the Java doc of > > > > > > > > > > `OperatorAttributesBuilder#build()`: > > > > > > > > > > > > > > > > > > > > /** > > > > > > > > > > * If any operator attribute is null, we will log it at > > > DEBUG > > > > > > level > > > > > > > > and > > > > > > > > > > use the following > > > > > > > > > > * default values. > > > > > > > > > > * - isOutputOnEOF defaults to false > > > > > > > > > > * - isOutputOnCheckpoint defaults to false > > > > > > > > > > * - isInternalSorterSupported defaults to false > > > > > > > > > > */ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Second question, have you thought about cases where > > someone > > > > is > > > > > > > > > > > either bootstrapping from a streaming source like Kafka > > > > > > > > > > > or simply trying to catch up after a long period of > > > downtime > > > > > in a > > > > > > > > > purely > > > > > > > > > > > streaming job? Generally speaking a cases where > > > > > > > > > > > user doesn't care about latency in the catch up phase, > > > > > regardless > > > > > > > if > > > > > > > > > the > > > > > > > > > > > source is bounded or unbounded, but wants to process > > > > > > > > > > > the data as fast as possible, and then switch > dynamically > > > to > > > > > real > > > > > > > > time > > > > > > > > > > > processing? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I have thought about this. We should allow this job > to > > > > > > > effectively > > > > > > > > > run > > > > > > > > > > in batch mode when the job is in the catch-up phase. > > FLIP-327 > > > > is > > > > > > > > actually > > > > > > > > > > an important step toward addressing this use-case. > > > > > > > > > > > > > > > > > > > > In order to address the above use-case, all we need is a > > way > > > > for > > > > > > > source > > > > > > > > > > operator (e.g. Kafka) to tell Flink runtime (via > > > > > > IsProcessingBacklog) > > > > > > > > > > whether it is in the catch-up phase. > > > > > > > > > > > > > > > > > > > > Since every Kafka message has event-timestamp, we can > allow > > > > users > > > > > > to > > > > > > > > > > specify a job-level config such as > > > > > backlog-watermark-lag-threshold, > > > > > > > and > > > > > > > > > > consider a Kafka Source to have IsProcessingBacklog=true > if > > > > > > > > system_time - > > > > > > > > > > watermark > backlog-watermark-lag-threshold. This > > effectively > > > > > > allows > > > > > > > us > > > > > > > > > to > > > > > > > > > > determine whether Kafka is in the catch up phase. > > > > > > > > > > > > > > > > > > > > Once we have this capability (I plan to work on this in > > > > > FLIP-328), > > > > > > we > > > > > > > > can > > > > > > > > > > directly use the features proposed in FLIP-325 and > FLIP-327 > > > to > > > > > > > optimize > > > > > > > > > the > > > > > > > > > > above use-case. > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin < > lindon...@gmail.com > > > > > > > > > > > napisał(a): > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > I am opening this thread to discuss FLIP-327: Support > > > > > > > stream-batch > > > > > > > > > > > unified > > > > > > > > > > > > operator to improve job throughput when processing > > > backlog > > > > > > data. > > > > > > > > The > > > > > > > > > > > design > > > > > > > > > > > > doc can be found at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > > > > > This FLIP enables a Flink job to initially operate in > > > batch > > > > > > mode, > > > > > > > > > > > achieving > > > > > > > > > > > > high throughput while processing records that do not > > > > require > > > > > > low > > > > > > > > > > > processing > > > > > > > > > > > > latency. Subsequently, the job can seamlessly > > transition > > > to > > > > > > > stream > > > > > > > > > mode > > > > > > > > > > > for > > > > > > > > > > > > processing real-time records with low latency. > > > Importantly, > > > > > the > > > > > > > > same > > > > > > > > > > > state > > > > > > > > > > > > can be utilized before and after this mode switch, > > making > > > > it > > > > > > > > > > particularly > > > > > > > > > > > > valuable when users wish to bootstrap the job's state > > > using > > > > > > > > > historical > > > > > > > > > > > > data. > > > > > > > > > > > > > > > > > > > > > > > > We would greatly appreciate any comments or feedback > > you > > > > may > > > > > > have > > > > > > > > on > > > > > > > > > > this > > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >