Hi Dong,

Operators API is unfortunately also our public facing API and I mean the
APIs that we will add there should also be marked `@Experimental` IMO.

The config options should also be marked as experimental (both
annotated @Experimental and noted the same thing in the docs,
if @Experimental annotation is not automatically mentioned in the docs).

> Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

We should do this independently from marking the APIs/config options as
`@Experimental`

Best,
Piotrek

pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> napisał(a):

> Hi Piotr,
>
> Thanks for the reply!
>
> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Sorry for the long delay in responding!
> >
> > >  Given that it is an optional feature that can be
> > > turned off by users, it might be OK to just let users try it out and we
> > can
> > > fix performance issues once we detect any of them. What do you think?
> >
> > I think it's fine. It would be best to mark this feature as experimental,
> > and
> > we say that the config keys or the default values might change in the
> > future.
> >
>
> In general I agree we can mark APIs that determine "whether to enable
> dynamic switching between stream/batch mode" as experimental.
>
> However, I am not sure we have such an API yet. The APIs added in this FLIP
> are intended to be used by operator developers rather than end users. End
> users can enable this capability by setting
> execution.checkpointing.interval-during-backlog = Long.MAX and uses a
> source which might implicitly set backlog statu (e.g. HybridSource). So
> execution.checkpointing.interval-during-backlog is the only user-facing
> APIs that can always control whether this feature can be used.
>
> However, execution.checkpointing.interval-during-backlog itself is not tied
> to FLIP-327.
>
> Do you mean we should set checkpointing.interval-during-backlog as
> experimental? Alternatively, how about we add a doc for
> checkpointing.interval-during-backlog explaining its impact/concern as
> discussed above?
>
> Best,
> Dong
>
>
> > > Maybe we can revisit the need for such a config when we
> introduce/discuss
> > > the capability to switch backlog from false to true in the future. What
> > do
> > > you think?
> >
> > Sure, we can do that.
> >
> > Best,
> > Piotrek
> >
> > niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Thanks a lot for the explanation. Please see my reply inline.
> > >
> > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > piotr.nowoj...@gmail.com>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks a lot for the answers. I can now only briefly answer your last
> > > > email.
> > > >
> > > > > It is possible that spilling to disks might cause larger overhead.
> > IMO
> > > it
> > > > > is an orthogonal issue already existing in Flink. This is because a
> > > Flink
> > > > > job running batch mode might also be slower than its throughput in
> > > stream
> > > > > mode due to the same reason.
> > > >
> > > > Yes, I know, but the thing that worries me is that previously only a
> > user
> > > > alone
> > > > could decide whether to use batch mode or streaming, and in practice
> > one
> > > > user would rarely (if ever) use both for the same problem/job/query.
> If
> > > his
> > > > intention was to eventually process live data, he was using streaming
> > > even
> > > > if there was a large backlog at the start (apart of some very few
> very
> > > > power
> > > > users).
> > > >
> > > > With this change, we want to introduce a mode that would be switching
> > > back
> > > > and forth between streaming and "batch in streaming" automatically.
> So
> > a
> > > > potential performance regression would be much more visible and
> painful
> > > > at the same time. If batch query runs slower then it could, it's kind
> > of
> > > > fine as
> > > > it will end at some point. If streaming query during large back
> > pressure
> > > > maybe
> > > > temporary load spike switches to batch processing, that's a bigger
> > deal.
> > > > Especially if batch processing mode will not be able to actually even
> > > > handle
> > > > the normal load, after the load spike. In that case, the job could
> > never
> > > > recover
> > > > from the backpressure/backlog mode.
> > > >
> > >
> > > I understand you are concerned with the risk of performance regression
> > > introduced due to switching to batch mode.
> > >
> > > After thinking about this more, I think this existing proposal meets
> the
> > > minimum requirement of "not introducing regression for existing jobs".
> > The
> > > reason is that even if batch mode can be slower than stream mode for
> some
> > > operators in some cases, this is an optional feature that will only be
> > > enabled if a user explicitly overrides the newly introduced config to
> > > non-default values. Existing jobs that simply upgrade their Flink
> library
> > > version will not suffer any performance regression.
> > >
> > > More specifically, in order to switch to batch mode, users will need to
> > > explicitly set execution.checkpointing.interval-during-backlog to 0.
> And
> > > users can always explicitly update
> > > execution.checkpointing.interval-during-backlog to turn off the batch
> > mode
> > > if that incurs any performance issue.
> > >
> > > As far as I can tell, for all practical workloads we see in production
> > > jobs, batch mode is always faster (w.r.t. throughput) than stream mode
> > when
> > > there is a high backlog of incoming records. Though it is still
> > > theoretically possible, it should be very rare (if any) for batch mode
> to
> > > be slower in practice. Given that it is an optional feature that can be
> > > turned off by users, it might be OK to just let users try it out and we
> > can
> > > fix performance issues once we detect any of them. What do you think?
> > >
> > >
> > > >
> > > > > execution.backlog.use-full-batch-mode-on-start (default false)
> > > >
> > > > ops sorry, it was supposed to be sth like:
> > > >
> > > > execution.backlog.use-batch-mode-only-on-start (default false)
> > > >
> > > > That option would disallow switching from streaming to batch. Batch
> > mode
> > > > would be allowed only to get rid of the initial, present on start-up
> > > > backlog.
> > > >
> > > > Would allow us to safely experiment with switching from streaming to
> > > batch
> > > > and I would be actually more fine in enabling "using batch mode on
> > start"
> > > > by default, until we gain confidence and feedback that switching
> back &
> > > > forth
> > > > is working as expected.
> > > >
> > >
> > > Now I understand what you are suggesting. I agree that it is necessary
> > for
> > > users to be able to disallow switching from streaming to batch.
> > >
> > > I am not sure it is necessary to introduce an extra config just for
> this
> > > purpose. The reason is that we don't have any strategy that switches
> > > backlog status from false to true yet. And when we have such strategy
> > (e.g.
> > > FLIP-328) in the future, it is very likely that we will introduce extra
> > > config(s) for users to explicitly turn on such a feature. That means
> user
> > > should be able to turn off this feature even if we don't have something
> > > like execution.backlog.use-batch-mode-only-on-start.
> > >
> > > Maybe we can revisit the need for such a config when we
> introduce/discuss
> > > the capability to switch backlog from false to true in the future. What
> > do
> > > you think?
> > >
> > >
> > > >
> > > > >> Or we could limit the scope of this FLIP to only support starting
> > with
> > > > >> batch mode and switching only once to
> > > > >> streaming, and design a follow up with switching back and forth?
> > > > >
> > > > > Sure, that sounds good to me. I am happy to split this FLIP into
> two
> > > > FLIPs
> > > > > so that we can make incremental progress.
> > > >
> > > > Great, let's do that. In a follow up FLIP we could restart the
> > discussion
> > > > about
> > > > switching back and forth.
> > > >
> > >
> > > Cool, I added the following statement to the motivation section.
> > >
> > > "NOTE: this FLIP focuses only on the capability to switch from batch to
> > > stream mode. If there is any extra API needed to support switching from
> > > stream to batch mode, we will discuss them in a follow-up FLIP."
> > >
> > > I am looking forward to reading your follow-up thoughts!
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > > Piotrek
> > > >
> > > > czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> napisał(a):
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thank you for the very detailed comments! Please see my reply
> inline.
> > > > >
> > > > > On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > > I have a couple of follow up questions about switching back and
> > forth
> > > > > > between streaming and batching mode.
> > > > > > Especially around shuffle/watermark strategy, and keyed state
> > > backend.
> > > > > >
> > > > > > First of all, it might not always be beneficial to switch into
> the
> > > > batch
> > > > > > modes:
> > > > > > - Shuffle strategy
> > > > > >     - Is sorting going to be purely in-memory? If not, obviously
> > > > spilling
> > > > > > to disks might cause larger overheads
> > > > > >        compared to not sorting the records.
> > > > > >
> > > > >
> > > > > Sorting might require spilling data to disk depending on the input
> > > size.
> > > > > The behavior of sorting w.r.t. memory/disk is expected to be
> exactly
> > > the
> > > > > same as the behavior of input sorting automatically performed by
> > Flink
> > > > > runtime in batch mode for keyed inputs.
> > > > >
> > > > > More specifically, ExternalSorter
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> > > > > >
> > > > > is
> > > > > currently used to sort keyed inputs in batch mode. It is
> > automatically
> > > > used
> > > > > by Flink runtime in OneInputStreamTask (here
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> > > > > >)
> > > > > and in MultiInputSortingDataInput (here
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> > > > > >).
> > > > > We plan to re-use the same code/mechanism to do sorting.
> > > > >
> > > > > It is possible that spilling to disks might cause larger overhead.
> > IMO
> > > it
> > > > > is an orthogonal issue already existing in Flink. This is because a
> > > Flink
> > > > > job running batch mode might also be slower than its throughput in
> > > stream
> > > > > mode due to the same reason. However, even though it is possible in
> > > > theory,
> > > > > I expect that in practice the throughput of using sorting +
> > > > > BatchExecutionKeyedStateBackend should be much higher than using
> > other
> > > > > keyed statebackends when the amount of data is large. As a matter
> of
> > > > fact,
> > > > > we have not heard of complaints of such performance regression
> issues
> > > in
> > > > > batch mode.
> > > > >
> > > > > The primary goal of this FLIP is to allow the operator to run at
> the
> > > same
> > > > > throughput (in stream mode when there is backlog) as it can
> currently
> > > do
> > > > in
> > > > > batch mode. And this goal is not affected by the disk overhead
> issue
> > > > > mentioned above.
> > > > >
> > > > > I am thinking maybe we can treat it as an orthogonal performance
> > > > > optimization problem instead of solving this problem in this FLIP?
> > > > >
> > > > >     - If it will be at least partially in-memory, does Flink have
> > some
> > > > > > mechanism to reserve optional memory that
> > > > > >       can be revoked if a new operator starts up? Can this memory
> > be
> > > > > > redistributed? Ideally we should use as
> > > > > >       much as possible of the available memory to avoid spilling
> > > costs,
> > > > > but
> > > > > > also being able to revoke that memory
> > > > > >
> > > > >
> > > > > This FLIP does not support dynamically revoking/redistribuitng
> > managed
> > > > > memory used by the ExternalSorter.
> > > > >
> > > > > For operators with isInternalSorterSupported = true, we will
> allocate
> > > to
> > > > > this operator execution.sorted-inputs.memory
> > > > > <
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
> > > > > >
> > > > > amount of managed memory. This is the same as how Flink allocates
> > > managed
> > > > > memory to an operator when this operator has keyed inputs in batch
> > > mode.
> > > > >
> > > > > Note that this FLIP intends to support operators to sort inputs
> > > whenever
> > > > > there is backlog. And there is currently no way for an operator to
> > know
> > > > in
> > > > > advance whether there will be no backlog after a given time. So it
> > > seems
> > > > > simpler to just keep managed memory for such an operator throughout
> > the
> > > > > lifecycle of this operator, for now.
> > > > >
> > > > > Besides, it seems that the lack of ability to dynamically
> > > > > revoke/redistribute un-used managed memory is an existing issue in
> > > Flink.
> > > > > For example, we might have two operators sharing the same slot and
> > > these
> > > > > two operators both use managed memory (e.g. to sort inputs). There
> is
> > > > > currently no way for one operator to re-use the memory not used by
> > the
> > > > > other operator.
> > > > >
> > > > > Therefore, I think we can treat this as an orthogonal performance
> > > > > optimization problem which can be addressed separately. What do you
> > > > think?
> > > > >
> > > > >
> > > > > >     - Sometimes sorting, even if we have memory to do that, might
> > be
> > > an
> > > > > > unnecessary overhead.
> > > > > > - Watermarks
> > > > > >     - Is holding back watermarks always good? If we have tons of
> > data
> > > > > > buffered/sorted and waiting to be processed
> > > > > >        with multiple windows per key and many different keys.
> When
> > we
> > > > > > switch back to `isBacklog=false` we
> > > > > >        first process all of that data before processing
> watermarks,
> > > for
> > > > > > operators that are not using sorted input the
> > > > > >        state size can explode significantly causing lots of
> > problems.
> > > > > Even
> > > > > > for those that can use sorting, switching to
> > > > > >        sorting or BatchExecutionKeyedStateBackend is not always a
> > > good
> > > > > > idea, but keeping RocksDB also can be
> > > > > >        risky.
> > > > > >
> > > > >
> > > > > With the current FLIP, the proposal is to use a sorter only when
> the
> > > > inputs
> > > > > have keys. According to this practice, operators which are not
> using
> > > > > sorting should have un-keyed inputs. I believe such an operator
> will
> > > not
> > > > > even use a keyed state backend. Maybe I missed some use-case. Can
> you
> > > > > provide a use-case where we will have an operator with un-keyed
> > inputs
> > > > > whose state size can explode due to we holding back watermarks?
> > > > >
> > > > > For operators with keyed inputs that use sorting, I suppose it is
> > > > possible
> > > > > that sorting + BatchExecutionKeyedStateBackend can be worse than
> > using
> > > > > RocksDB. But I believe this is very very rare (if possible) in
> almost
> > > > > practical usage of Flink.
> > > > >
> > > > > Take one step back, if this indeed cause regression for a real
> > > use-case,
> > > > > user can set execution.checkpointing.interval-during-backlog to
> > > anything
> > > > > other than 0 so that this FLIP will not use
> > > > > sorter + BatchExecutionKeyedStateBackend even even when there is
> > > backlog.
> > > > >
> > > > > I would hope we can find a way to automatically determine whether
> > using
> > > > > sorting + BatchExecutionKeyedStateBackend can be better or worse
> than
> > > > using
> > > > > RocksDB alone. But I could not find a good and reliable way to do
> > this.
> > > > > Maybe we can update Flink to do this when we find a good way to do
> > this
> > > > in
> > > > > the future?
> > > > >
> > > > >
> > > > >
> > > > > > - Keyed state backend
> > > > > >     - I think you haven't described what happens during switching
> > > from
> > > > > > streaming to backlog processing.
> > > > > >
> > > > >
> > > > > Good point. This indeed needs to be described. I added a TODO in
> the
> > > > > "Behavior changes ..." section to describe what happens when
> > isBacklog
> > > > > switches from false to true, for all
> > watermark/checkpoint/statebackend
> > > > etc.
> > > > >
> > > > > Let me explain this for the state backend here for now. I will
> update
> > > > FLIP
> > > > > later.
> > > > >
> > > > > When isBacklog switches from false to true, operator with keyed
> > inputs
> > > > can
> > > > > optionally (as determined by its implementation) starts to use
> > internal
> > > > > sorter to sort inputs by key, without processing inputs or updating
> > > > > statebackend, until it receives end-of-inputs or isBacklog is
> > switched
> > > to
> > > > > false again.
> > > > >
> > > > >
> > > > >
> > > > > >     - Switch can be an unnecessary overhead.
> > > > >
> > > > >
> > > > > I agree it can cause unnecessary overhead, particularly when
> > isBacklog
> > > > > switches back and forth frequently. Whether or not this is
> > unnecessary
> > > > > likely depends on the duration/throughput of the backlog phase as
> > well
> > > as
> > > > > the specific computation logic of the operator. I am not sure there
> > is
> > > a
> > > > > good way for Flink to determine in advance whether switching is
> > > > > unnecessary.
> > > > >
> > > > > Note that for the existing use-case where we expect to change
> > isBacklog
> > > > to
> > > > > true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag
> being
> > > too
> > > > > high), we don't expect the watermark to switch back and force
> > > frequently.
> > > > > And user can disable this switch by setting
> > > > > execution.checkpointing.interval-during-backlog to anything other
> > than
> > > 0.
> > > > >
> > > > > Therefore, I am wondering if we can also view this as a performance
> > > > > optimization opportunity for extra use-cases in the future, rather
> > > than a
> > > > > blocking issue of this FLIP for the MVP use-case (e.g. snapshot
> phase
> > > for
> > > > > any CDC source, Kafka watermark lag).
> > > > >
> > > > >
> > > > > > At the same time, in your current proposal, for
> > > > > > `execution.checkpointing.interval-during-backlog > 0` we won't
> > > > > > switch to "batch" mode at all. That's a bit of shame, I don't
> > > > understand
> > > > > > why those two things should be coupled
> > > > > > together?
> > > > > >
> > > > >
> > > > > We can in general classify optimizations as those that are
> compatible
> > > > with
> > > > > checkpointing, and those that are not compatible with
> checkpointing.
> > > For
> > > > > example, input sorting is currently not compatible with
> > checkpointing.
> > > > And
> > > > > buffering input records to reduce state backend overhead (and
> > probably
> > > > > columnar processing for mini-batch in the future) is compatible
> with
> > > > > checkpointing.
> > > > >
> > > > > The primary of FLIP-327 is to support optimizations not compatible
> > with
> > > > > checkpointing. If execution.checkpointing.interval-during-backlog >
> > 0,
> > > > > which means that user intends to still do checkpointing even when
> > there
> > > > is
> > > > > backog, then we will not be able to support such optimizations.
> > > > >
> > > > > For optimizations that are compatible with checkpointing, we can do
> > > this
> > > > > even when the operator does not run in "batch mode". There are
> extra
> > > > > problems to solve in order to achieve this optimization, such as
> > > > supporting
> > > > > unaligned checkpointing without prolonging its sync phase. I plan
> to
> > > > > explain how this can be done in FLIP-325.
> > > > >
> > > > >
> > > > > > All in all, shouldn't we aim for some more clever process of
> > > switching
> > > > > back
> > > > > > and forth between streaming/batch modes
> > > > > > for watermark strategy/state backend/sorting based on some
> metrics?
> > > > > Trying
> > > > > > to either predict if switching might help,
> > > > > > or trying to estimate if the last switch was beneficial? Maybe
> > > > something
> > > > > > along the lines:
> > > > > > - sort only in memory and during sorting count the number of
> > distinct
> > > > > keys
> > > > > > (NDK)
> > > > > >     - maybe allow for spilling if so far in memory we have NDK *
> 5
> > >=
> > > > > > #records
> > > > > > - do not allow to buffer records above a certain threshold, as
> > > > otherwise
> > > > > > checkpointing can explode
> > > > > > - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >=
> > > > #records
> > > > > > - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records
> > > > > >
> > > > > > Or even maybe for starters something even simpler and then test
> out
> > > > > > something more fancy as a follow up?
> > > > > >
> > > > >
> > > > > I agree it is worth investigating these ideas to further optimize
> the
> > > > > performance during backlog.
> > > > >
> > > > > I just think these can be done independently after this FLIP. The
> > focus
> > > > of
> > > > > this FLIP is to re-use in stream mode the same optimization which
> we
> > > > > already use in batch mode, rather than inventing or improving the
> > > > > performance of these existing optimizations.
> > > > >
> > > > > Given that there are already a lot of new mechanism/features to
> > discuss
> > > > and
> > > > > address in this FLIP, I am hoping we can limit the scope of this
> FLIP
> > > to
> > > > > re-use the existing optimization, and do these extra optimization
> > > > > opportunities as future work.
> > > > >
> > > > > What do you think?
> > > > >
> > > > >
> > > > > >
> > > > > > At the same time,
> > `execution.checkpointing.interval-during-backlog=0`
> > > > > seems
> > > > > > a weird setting to me, that I would
> > > > > > not feel safe recommending to anyone. If processing of a backlog
> > > takes
> > > > a
> > > > > > long time, a job might stop making
> > > > > > any progress due to some random failures. Especially dangerous
> if a
> > > job
> > > > >
> > > > > switches from streaming mode back to
> > > > > > backlog processing due to some reasons, as that could happen
> months
> > > > after
> > > > > > someone started a job with this
> > > > > > strange setting. So should we even have it? I would simply
> disallow
> > > > it. I
> > > > > >
> > > > >
> > > > > Good point. I do agree we need to further work to improve the
> > failover
> > > > > performance in case any task fails.
> > > > >
> > > > > As of the current FLIP, if any task fails during backlog and
> > > > > execution.checkpointing.interval-during-backlog = 0, we will need
> to
> > > > > restart all operators to the last checkpointed state and continue
> > > > > processing backlog. And this can be a lot of rollback since there
> is
> > no
> > > > > checkpoint during backlog. And this can also be worse than batch
> > since
> > > > this
> > > > > FLIP currently does not support exporting/saving records to local
> > disk
> > > > (or
> > > > > shuffle service) so that a failed task can re-consume the records
> > from
> > > > the
> > > > > upstream task (or shuffle service) in the same way as how Flink
> > > failover
> > > > a
> > > > > task in batch mode.
> > > > >
> > > > > I think we can extend this FLIP to solve this problem so that it
> can
> > > have
> > > > > at least the same behavior/performance as batch-mode job. The idea
> is
> > > to
> > > > > also follow what batch mode does. For example, we can trigger a
> > > > checkpoint
> > > > > when isBacklog switches to true, and every operator should buffer
> its
> > > > > output in the TM local disk (or remote shuffle service). Therefore,
> > > > after a
> > > > > task fails, it can restart from the last checkpoint and re-consume
> > data
> > > > > buffered in the upstream task.
> > > > >
> > > > > I will update FLIP as described above. Would this address your
> > concern?
> > > > >
> > > > >
> > > > >
> > > > > > could see a power setting like:
> > > > > >         `execution.backlog.use-full-batch-mode-on-start (default
> > > > false)`
> > > > > >
> > > > >
> > > > > I am not sure I fully understand this config or its motivation. Can
> > you
> > > > > help explain the exact semantics of this config?
> > > > >
> > > > >
> > > > > > that would override any heuristic of switching to backlog if
> > someone
> > > is
> > > > > > submitting a new job that starts with
> > > > > > `isBacklog=true`.
> > > > > >
> > > > > > Or we could limit the scope of this FLIP to only support starting
> > > with
> > > > > > batch mode and switching only once to
> > > > > > streaming, and design a follow up with switching back and forth?
> > > > > >
> > > > >
> > > > > Sure, that sounds good to me. I am happy to split this FLIP into
> two
> > > > FLIPs
> > > > > so that we can make incremental progress.
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > >
> > > > > > I'm looking forwards to hearing/reading out your thoughts.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > >
> > > > > > śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid>
> > > > > napisał(a):
> > > > > >
> > > > > > > Hi Dong,
> > > > > > >
> > > > > > > Thanks for your reply!
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Jing
> > > > > > >
> > > > > > > On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Jing,
> > > > > > > >
> > > > > > > > Thanks for the comments. Please see my reply inline.
> > > > > > > >
> > > > > > > > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
> > > <j...@ververica.com.invalid
> > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Dong,
> > > > > > > > >
> > > > > > > > > Thanks for the clarification. Now it is clear for me. I got
> > > > > > additional
> > > > > > > > noob
> > > > > > > > > questions wrt the internal sorter.
> > > > > > > > >
> > > > > > > > > 1. when to call setter to set the internalSorterSupported
> to
> > be
> > > > > true?
> > > > > > > > >
> > > > > > > >
> > > > > > > > Developer of the operator class (i.e. those classes which
> > > > implements
> > > > > > > > `StreamOperator`) should override the
> > `#getOperatorAttributes()`
> > > > API
> > > > > to
> > > > > > > set
> > > > > > > > internalSorterSupported to true, if he/she decides to sort
> > > records
> > > > > > > > internally in the operator.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 2
> > > > > > > > > *"For those operators whose throughput can be considerably
> > > > improved
> > > > > > > with
> > > > > > > > an
> > > > > > > > > internal sorter, update it to take advantage of the
> internal
> > > > sorter
> > > > > > > when
> > > > > > > > > its input has isBacklog=true.*
> > > > > > > > > *Typically, operators that involve aggregation operation
> > (e.g.
> > > > > join,
> > > > > > > > > cogroup, aggregate) on keyed inputs can benefit from using
> an
> > > > > > internal
> > > > > > > > > sorter."*
> > > > > > > > >
> > > > > > > > > *"The operator that performs CoGroup operation will
> > instantiate
> > > > two
> > > > > > > > > internal sorter to sorts records from its two inputs
> > > separately.
> > > > > Then
> > > > > > > it
> > > > > > > > > can pull the sorted records from these two sorters. This
> can
> > be
> > > > > done
> > > > > > > > > without wrapping input records with TaggedUnion<...>. In
> > > > > comparison,
> > > > > > > the
> > > > > > > > > existing DataStream#coGroup needs to wrap input records
> with
> > > > > > > > > TaggedUnion<...> before sorting them using one external
> > sorter,
> > > > > which
> > > > > > > > > introduces higher overhead."*
> > > > > > > > >
> > > > > > > > > According to the performance test, it seems that internal
> > > sorter
> > > > > has
> > > > > > > > better
> > > > > > > > > performance than external sorter. Is it possible to make
> > those
> > > > > > > operators
> > > > > > > > > that can benefit from it use internal sorter by default?
> > > > > > > > >
> > > > > > > >
> > > > > > > > Yes, it is possible. After this FLIP is done, users can use
> > > > > > > > DataStream#coGroup with EndOfStreamWindows as the window
> > assigner
> > > > to
> > > > > > > > co-group two streams in effectively the batch manner. An
> > operator
> > > > > that
> > > > > > > uses
> > > > > > > > an internal sorter will be used to perform the co-group
> > > operation.
> > > > > > There
> > > > > > > is
> > > > > > > > no need for users of the DataStream API to explicitly know or
> > set
> > > > the
> > > > > > > > internal sorter in anyway.
> > > > > > > >
> > > > > > > > In the future, we plan to incrementally optimize other
> > > aggregation
> > > > > > > > operation (e.g. aggregate) on the DataStream API when
> > > > > > EndOfStreamWindows
> > > > > > > is
> > > > > > > > used as the window assigner.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Jing
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
> > lindon...@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jing,
> > > > > > > > > >
> > > > > > > > > > Thank you for the comments! Please see my reply inline.
> > > > > > > > > >
> > > > > > > > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
> > > > > <j...@ververica.com.invalid
> > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Dong,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the proposal! The FLIP is already in good
> > > shape. I
> > > > > got
> > > > > > > > some
> > > > > > > > > > NIT
> > > > > > > > > > > questions.
> > > > > > > > > > >
> > > > > > > > > > > 1. It is a little bit weird to write the hint right
> after
> > > the
> > > > > > > > > motivation
> > > > > > > > > > > that some features have been moved to FLIP-331, because
> > at
> > > > that
> > > > > > > time,
> > > > > > > > > > > readers don't know the context about what features does
> > it
> > > > > mean.
> > > > > > I
> > > > > > > > > would
> > > > > > > > > > > suggest moving the note to the beginning of "Public
> > > > interfaces"
> > > > > > > > > sections.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Given that the reviewer who commented on this email
> thread
> > > > > before I
> > > > > > > > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I
> think
> > it
> > > > is
> > > > > > > > simpler
> > > > > > > > > to
> > > > > > > > > > just remove any mention of FLIP-331. I have updated the
> > FLIP
> > > > > > > > accordingly.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > 2. It is also a little bit weird to describe all
> > behaviour
> > > > > > changes
> > > > > > > at
> > > > > > > > > > first
> > > > > > > > > > > but only focus on one single feature, i.e. how to
> > implement
> > > > > > > > > > > internalSorterSupported. TBH, I was lost while I was
> > > reading
> > > > > the
> > > > > > > > Public
> > > > > > > > > > > interfaces. Maybe change the FLIP title? Another option
> > > could
> > > > > be
> > > > > > to
> > > > > > > > > > write a
> > > > > > > > > > > short summary of all features and point out that this
> > FLIP
> > > > will
> > > > > > > only
> > > > > > > > > > focus
> > > > > > > > > > > on the internalSorterSupported feature. Others could be
> > > found
> > > > > in
> > > > > > > > > > FLIP-331.
> > > > > > > > > > > WDYT?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Conceptually, the purpose of this FLIP is to allow a
> stream
> > > > mode
> > > > > > job
> > > > > > > to
> > > > > > > > > run
> > > > > > > > > > parts of the topology in batch mode so that it can apply
> > > > > > > > > > optimizations/computations that can not be used together
> > with
> > > > > > > > > checkpointing
> > > > > > > > > > (and thus not usable in stream mode). Although internal
> > > sorter
> > > > is
> > > > > > the
> > > > > > > > > only
> > > > > > > > > > optimization immediately supported in this FLIP, this
> FLIP
> > > lays
> > > > > the
> > > > > > > > > > foundation to support other optimizations in the future,
> > such
> > > > as
> > > > > > > using
> > > > > > > > > GPU
> > > > > > > > > > to process a bounded stream of records.
> > > > > > > > > >
> > > > > > > > > > Therefore, I find it better to keep the current title
> > rather
> > > > than
> > > > > > > > > limiting
> > > > > > > > > > the scope to internal sorter. What do you think?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > 3. There should be a typo at 4) Checkpoint and failover
> > > > > strategy
> > > > > > ->
> > > > > > > > > Mixed
> > > > > > > > > > > mode ->
> > > > > > > > > > >
> > > > > > > > > > >    - If any task fails when isBacklog=false true, this
> > task
> > > > is
> > > > > > > > > restarted
> > > > > > > > > > to
> > > > > > > > > > >    re-process its input from the beginning.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > Thank you for catching this issue. It is fixed now.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Dong
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best regards
> > > > > > > > > > > Jing
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Piotr,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for your comments! Please see my reply inline.
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > > > > > > > > piotr.nowoj...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I have a couple of questions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Could you explain why those properties
> > > > > > > > > > > > >
> > > > > > > > > > > > >     @Nullable private Boolean isOutputOnEOF = null;
> > > > > > > > > > > > >     @Nullable private Boolean isOutputOnCheckpoint
> =
> > > > null;
> > > > > > > > > > > > >     @Nullable private Boolean
> > > isInternalSorterSupported =
> > > > > > null;
> > > > > > > > > > > > >
> > > > > > > > > > > > > must be `@Nullable`, instead of having the default
> > > value
> > > > > set
> > > > > > to
> > > > > > > > > > > `false`?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > By initializing these private variables in
> > > > > > > > OperatorAttributesBuilder
> > > > > > > > > as
> > > > > > > > > > > > null, we can implement
> > > `OperatorAttributesBuilder#build()`
> > > > in
> > > > > > > such
> > > > > > > > a
> > > > > > > > > > way
> > > > > > > > > > > > that it can print DEBUG level logging to say
> > > > > > > "isOutputOnCheckpoint
> > > > > > > > is
> > > > > > > > > > not
> > > > > > > > > > > > explicitly set". This can help user/SRE debug
> > performance
> > > > > > issues
> > > > > > > > (or
> > > > > > > > > > lack
> > > > > > > > > > > > of the expected optimization) due to operators not
> > > > explicitly
> > > > > > > > setting
> > > > > > > > > > the
> > > > > > > > > > > > right operator attribute.
> > > > > > > > > > > >
> > > > > > > > > > > > For example, we might want a job to always use the
> > longer
> > > > > > > > > checkpointing
> > > > > > > > > > > > interval (i.e.
> > > > > execution.checkpointing.interval-during-backlog)
> > > > > > > if
> > > > > > > > > all
> > > > > > > > > > > > running operators have isOutputOnCheckpoint==false,
> and
> > > use
> > > > > the
> > > > > > > > short
> > > > > > > > > > > > checkpointing interval otherwise. If a user has
> > > explicitly
> > > > > > > > configured
> > > > > > > > > > the
> > > > > > > > > > > > execution.checkpointing.interval-during-backlog but
> the
> > > > > > two-phase
> > > > > > > > > > commit
> > > > > > > > > > > > sink library has not been upgraded to set
> > > > > > > > isOutputOnCheckpoint=true,
> > > > > > > > > > then
> > > > > > > > > > > > the job will end up using the long checkpointing
> > > interval,
> > > > > and
> > > > > > it
> > > > > > > > > will
> > > > > > > > > > be
> > > > > > > > > > > > useful to figure out what is going wrong in this case
> > by
> > > > > > checking
> > > > > > > > the
> > > > > > > > > > > log.
> > > > > > > > > > > >
> > > > > > > > > > > > Note that the default value of these fields of the
> > > > > > > > OperatorAttributes
> > > > > > > > > > > > instance built by OperatorAttributesBuilder will
> still
> > be
> > > > > > false.
> > > > > > > > The
> > > > > > > > > > > > following is mentioned in the Java doc of
> > > > > > > > > > > > `OperatorAttributesBuilder#build()`:
> > > > > > > > > > > >
> > > > > > > > > > > >  /**
> > > > > > > > > > > >   * If any operator attribute is null, we will log it
> > at
> > > > > DEBUG
> > > > > > > > level
> > > > > > > > > > and
> > > > > > > > > > > > use the following
> > > > > > > > > > > >   * default values.
> > > > > > > > > > > >   * - isOutputOnEOF defaults to false
> > > > > > > > > > > >   * - isOutputOnCheckpoint defaults to false
> > > > > > > > > > > >   * - isInternalSorterSupported defaults to false
> > > > > > > > > > > >   */
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Second question, have you thought about cases where
> > > > someone
> > > > > > is
> > > > > > > > > > > > > either bootstrapping from a streaming source like
> > Kafka
> > > > > > > > > > > > > or simply trying to catch up after a long period of
> > > > > downtime
> > > > > > > in a
> > > > > > > > > > > purely
> > > > > > > > > > > > > streaming job? Generally speaking a cases where
> > > > > > > > > > > > > user doesn't care about latency in the catch up
> > phase,
> > > > > > > regardless
> > > > > > > > > if
> > > > > > > > > > > the
> > > > > > > > > > > > > source is bounded or unbounded, but wants to
> process
> > > > > > > > > > > > > the data as fast as possible, and then switch
> > > dynamically
> > > > > to
> > > > > > > real
> > > > > > > > > > time
> > > > > > > > > > > > > processing?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, I have thought about this. We should allow this
> > job
> > > to
> > > > > > > > > effectively
> > > > > > > > > > > run
> > > > > > > > > > > > in batch mode when the job is in the catch-up phase.
> > > > FLIP-327
> > > > > > is
> > > > > > > > > > actually
> > > > > > > > > > > > an important step toward addressing this use-case.
> > > > > > > > > > > >
> > > > > > > > > > > > In order to address the above use-case, all we need
> is
> > a
> > > > way
> > > > > > for
> > > > > > > > > source
> > > > > > > > > > > > operator (e.g. Kafka) to tell Flink runtime (via
> > > > > > > > IsProcessingBacklog)
> > > > > > > > > > > > whether it is in the catch-up phase.
> > > > > > > > > > > >
> > > > > > > > > > > > Since every Kafka message has event-timestamp, we can
> > > allow
> > > > > > users
> > > > > > > > to
> > > > > > > > > > > > specify a job-level config such as
> > > > > > > backlog-watermark-lag-threshold,
> > > > > > > > > and
> > > > > > > > > > > > consider a Kafka Source to have
> > IsProcessingBacklog=true
> > > if
> > > > > > > > > > system_time -
> > > > > > > > > > > > watermark > backlog-watermark-lag-threshold. This
> > > > effectively
> > > > > > > > allows
> > > > > > > > > us
> > > > > > > > > > > to
> > > > > > > > > > > > determine whether Kafka is in the catch up phase.
> > > > > > > > > > > >
> > > > > > > > > > > > Once we have this capability (I plan to work on this
> in
> > > > > > > FLIP-328),
> > > > > > > > we
> > > > > > > > > > can
> > > > > > > > > > > > directly use the features proposed in FLIP-325 and
> > > FLIP-327
> > > > > to
> > > > > > > > > optimize
> > > > > > > > > > > the
> > > > > > > > > > > > above use-case.
> > > > > > > > > > > >
> > > > > > > > > > > > What do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Dong
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best,
> > > > > > > > > > > > > Piotrek
> > > > > > > > > > > > >
> > > > > > > > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > > > > > > napisał(a):
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I am opening this thread to discuss FLIP-327:
> > Support
> > > > > > > > > stream-batch
> > > > > > > > > > > > > unified
> > > > > > > > > > > > > > operator to improve job throughput when
> processing
> > > > > backlog
> > > > > > > > data.
> > > > > > > > > > The
> > > > > > > > > > > > > design
> > > > > > > > > > > > > > doc can be found at
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> > > > > > > > > > > > > > .
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This FLIP enables a Flink job to initially
> operate
> > in
> > > > > batch
> > > > > > > > mode,
> > > > > > > > > > > > > achieving
> > > > > > > > > > > > > > high throughput while processing records that do
> > not
> > > > > > require
> > > > > > > > low
> > > > > > > > > > > > > processing
> > > > > > > > > > > > > > latency. Subsequently, the job can seamlessly
> > > > transition
> > > > > to
> > > > > > > > > stream
> > > > > > > > > > > mode
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > processing real-time records with low latency.
> > > > > Importantly,
> > > > > > > the
> > > > > > > > > > same
> > > > > > > > > > > > > state
> > > > > > > > > > > > > > can be utilized before and after this mode
> switch,
> > > > making
> > > > > > it
> > > > > > > > > > > > particularly
> > > > > > > > > > > > > > valuable when users wish to bootstrap the job's
> > state
> > > > > using
> > > > > > > > > > > historical
> > > > > > > > > > > > > > data.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We would greatly appreciate any comments or
> > feedback
> > > > you
> > > > > > may
> > > > > > > > have
> > > > > > > > > > on
> > > > > > > > > > > > this
> > > > > > > > > > > > > > proposal.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > Dong
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to