Hi Piotr,

Thanks for the reply!

On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi,
>
> Sorry for the long delay in responding!
>
> >  Given that it is an optional feature that can be
> > turned off by users, it might be OK to just let users try it out and we
> can
> > fix performance issues once we detect any of them. What do you think?
>
> I think it's fine. It would be best to mark this feature as experimental,
> and
> we say that the config keys or the default values might change in the
> future.
>

In general I agree we can mark APIs that determine "whether to enable
dynamic switching between stream/batch mode" as experimental.

However, I am not sure we have such an API yet. The APIs added in this FLIP
are intended to be used by operator developers rather than end users. End
users can enable this capability by setting
execution.checkpointing.interval-during-backlog = Long.MAX and uses a
source which might implicitly set backlog statu (e.g. HybridSource). So
execution.checkpointing.interval-during-backlog is the only user-facing
APIs that can always control whether this feature can be used.

However, execution.checkpointing.interval-during-backlog itself is not tied
to FLIP-327.

Do you mean we should set checkpointing.interval-during-backlog as
experimental? Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

Best,
Dong


> > Maybe we can revisit the need for such a config when we introduce/discuss
> > the capability to switch backlog from false to true in the future. What
> do
> > you think?
>
> Sure, we can do that.
>
> Best,
> Piotrek
>
> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > Thanks a lot for the explanation. Please see my reply inline.
> >
> > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> piotr.nowoj...@gmail.com>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks a lot for the answers. I can now only briefly answer your last
> > > email.
> > >
> > > > It is possible that spilling to disks might cause larger overhead.
> IMO
> > it
> > > > is an orthogonal issue already existing in Flink. This is because a
> > Flink
> > > > job running batch mode might also be slower than its throughput in
> > stream
> > > > mode due to the same reason.
> > >
> > > Yes, I know, but the thing that worries me is that previously only a
> user
> > > alone
> > > could decide whether to use batch mode or streaming, and in practice
> one
> > > user would rarely (if ever) use both for the same problem/job/query. If
> > his
> > > intention was to eventually process live data, he was using streaming
> > even
> > > if there was a large backlog at the start (apart of some very few very
> > > power
> > > users).
> > >
> > > With this change, we want to introduce a mode that would be switching
> > back
> > > and forth between streaming and "batch in streaming" automatically. So
> a
> > > potential performance regression would be much more visible and painful
> > > at the same time. If batch query runs slower then it could, it's kind
> of
> > > fine as
> > > it will end at some point. If streaming query during large back
> pressure
> > > maybe
> > > temporary load spike switches to batch processing, that's a bigger
> deal.
> > > Especially if batch processing mode will not be able to actually even
> > > handle
> > > the normal load, after the load spike. In that case, the job could
> never
> > > recover
> > > from the backpressure/backlog mode.
> > >
> >
> > I understand you are concerned with the risk of performance regression
> > introduced due to switching to batch mode.
> >
> > After thinking about this more, I think this existing proposal meets the
> > minimum requirement of "not introducing regression for existing jobs".
> The
> > reason is that even if batch mode can be slower than stream mode for some
> > operators in some cases, this is an optional feature that will only be
> > enabled if a user explicitly overrides the newly introduced config to
> > non-default values. Existing jobs that simply upgrade their Flink library
> > version will not suffer any performance regression.
> >
> > More specifically, in order to switch to batch mode, users will need to
> > explicitly set execution.checkpointing.interval-during-backlog to 0. And
> > users can always explicitly update
> > execution.checkpointing.interval-during-backlog to turn off the batch
> mode
> > if that incurs any performance issue.
> >
> > As far as I can tell, for all practical workloads we see in production
> > jobs, batch mode is always faster (w.r.t. throughput) than stream mode
> when
> > there is a high backlog of incoming records. Though it is still
> > theoretically possible, it should be very rare (if any) for batch mode to
> > be slower in practice. Given that it is an optional feature that can be
> > turned off by users, it might be OK to just let users try it out and we
> can
> > fix performance issues once we detect any of them. What do you think?
> >
> >
> > >
> > > > execution.backlog.use-full-batch-mode-on-start (default false)
> > >
> > > ops sorry, it was supposed to be sth like:
> > >
> > > execution.backlog.use-batch-mode-only-on-start (default false)
> > >
> > > That option would disallow switching from streaming to batch. Batch
> mode
> > > would be allowed only to get rid of the initial, present on start-up
> > > backlog.
> > >
> > > Would allow us to safely experiment with switching from streaming to
> > batch
> > > and I would be actually more fine in enabling "using batch mode on
> start"
> > > by default, until we gain confidence and feedback that switching back &
> > > forth
> > > is working as expected.
> > >
> >
> > Now I understand what you are suggesting. I agree that it is necessary
> for
> > users to be able to disallow switching from streaming to batch.
> >
> > I am not sure it is necessary to introduce an extra config just for this
> > purpose. The reason is that we don't have any strategy that switches
> > backlog status from false to true yet. And when we have such strategy
> (e.g.
> > FLIP-328) in the future, it is very likely that we will introduce extra
> > config(s) for users to explicitly turn on such a feature. That means user
> > should be able to turn off this feature even if we don't have something
> > like execution.backlog.use-batch-mode-only-on-start.
> >
> > Maybe we can revisit the need for such a config when we introduce/discuss
> > the capability to switch backlog from false to true in the future. What
> do
> > you think?
> >
> >
> > >
> > > >> Or we could limit the scope of this FLIP to only support starting
> with
> > > >> batch mode and switching only once to
> > > >> streaming, and design a follow up with switching back and forth?
> > > >
> > > > Sure, that sounds good to me. I am happy to split this FLIP into two
> > > FLIPs
> > > > so that we can make incremental progress.
> > >
> > > Great, let's do that. In a follow up FLIP we could restart the
> discussion
> > > about
> > > switching back and forth.
> > >
> >
> > Cool, I added the following statement to the motivation section.
> >
> > "NOTE: this FLIP focuses only on the capability to switch from batch to
> > stream mode. If there is any extra API needed to support switching from
> > stream to batch mode, we will discuss them in a follow-up FLIP."
> >
> > I am looking forward to reading your follow-up thoughts!
> >
> > Best,
> > Dong
> >
> >
> > > Piotrek
> > >
> > > czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr,
> > > >
> > > > Thank you for the very detailed comments! Please see my reply inline.
> > > >
> > > > On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > I have a couple of follow up questions about switching back and
> forth
> > > > > between streaming and batching mode.
> > > > > Especially around shuffle/watermark strategy, and keyed state
> > backend.
> > > > >
> > > > > First of all, it might not always be beneficial to switch into the
> > > batch
> > > > > modes:
> > > > > - Shuffle strategy
> > > > >     - Is sorting going to be purely in-memory? If not, obviously
> > > spilling
> > > > > to disks might cause larger overheads
> > > > >        compared to not sorting the records.
> > > > >
> > > >
> > > > Sorting might require spilling data to disk depending on the input
> > size.
> > > > The behavior of sorting w.r.t. memory/disk is expected to be exactly
> > the
> > > > same as the behavior of input sorting automatically performed by
> Flink
> > > > runtime in batch mode for keyed inputs.
> > > >
> > > > More specifically, ExternalSorter
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> > > > >
> > > > is
> > > > currently used to sort keyed inputs in batch mode. It is
> automatically
> > > used
> > > > by Flink runtime in OneInputStreamTask (here
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> > > > >)
> > > > and in MultiInputSortingDataInput (here
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> > > > >).
> > > > We plan to re-use the same code/mechanism to do sorting.
> > > >
> > > > It is possible that spilling to disks might cause larger overhead.
> IMO
> > it
> > > > is an orthogonal issue already existing in Flink. This is because a
> > Flink
> > > > job running batch mode might also be slower than its throughput in
> > stream
> > > > mode due to the same reason. However, even though it is possible in
> > > theory,
> > > > I expect that in practice the throughput of using sorting +
> > > > BatchExecutionKeyedStateBackend should be much higher than using
> other
> > > > keyed statebackends when the amount of data is large. As a matter of
> > > fact,
> > > > we have not heard of complaints of such performance regression issues
> > in
> > > > batch mode.
> > > >
> > > > The primary goal of this FLIP is to allow the operator to run at the
> > same
> > > > throughput (in stream mode when there is backlog) as it can currently
> > do
> > > in
> > > > batch mode. And this goal is not affected by the disk overhead issue
> > > > mentioned above.
> > > >
> > > > I am thinking maybe we can treat it as an orthogonal performance
> > > > optimization problem instead of solving this problem in this FLIP?
> > > >
> > > >     - If it will be at least partially in-memory, does Flink have
> some
> > > > > mechanism to reserve optional memory that
> > > > >       can be revoked if a new operator starts up? Can this memory
> be
> > > > > redistributed? Ideally we should use as
> > > > >       much as possible of the available memory to avoid spilling
> > costs,
> > > > but
> > > > > also being able to revoke that memory
> > > > >
> > > >
> > > > This FLIP does not support dynamically revoking/redistribuitng
> managed
> > > > memory used by the ExternalSorter.
> > > >
> > > > For operators with isInternalSorterSupported = true, we will allocate
> > to
> > > > this operator execution.sorted-inputs.memory
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
> > > > >
> > > > amount of managed memory. This is the same as how Flink allocates
> > managed
> > > > memory to an operator when this operator has keyed inputs in batch
> > mode.
> > > >
> > > > Note that this FLIP intends to support operators to sort inputs
> > whenever
> > > > there is backlog. And there is currently no way for an operator to
> know
> > > in
> > > > advance whether there will be no backlog after a given time. So it
> > seems
> > > > simpler to just keep managed memory for such an operator throughout
> the
> > > > lifecycle of this operator, for now.
> > > >
> > > > Besides, it seems that the lack of ability to dynamically
> > > > revoke/redistribute un-used managed memory is an existing issue in
> > Flink.
> > > > For example, we might have two operators sharing the same slot and
> > these
> > > > two operators both use managed memory (e.g. to sort inputs). There is
> > > > currently no way for one operator to re-use the memory not used by
> the
> > > > other operator.
> > > >
> > > > Therefore, I think we can treat this as an orthogonal performance
> > > > optimization problem which can be addressed separately. What do you
> > > think?
> > > >
> > > >
> > > > >     - Sometimes sorting, even if we have memory to do that, might
> be
> > an
> > > > > unnecessary overhead.
> > > > > - Watermarks
> > > > >     - Is holding back watermarks always good? If we have tons of
> data
> > > > > buffered/sorted and waiting to be processed
> > > > >        with multiple windows per key and many different keys. When
> we
> > > > > switch back to `isBacklog=false` we
> > > > >        first process all of that data before processing watermarks,
> > for
> > > > > operators that are not using sorted input the
> > > > >        state size can explode significantly causing lots of
> problems.
> > > > Even
> > > > > for those that can use sorting, switching to
> > > > >        sorting or BatchExecutionKeyedStateBackend is not always a
> > good
> > > > > idea, but keeping RocksDB also can be
> > > > >        risky.
> > > > >
> > > >
> > > > With the current FLIP, the proposal is to use a sorter only when the
> > > inputs
> > > > have keys. According to this practice, operators which are not using
> > > > sorting should have un-keyed inputs. I believe such an operator will
> > not
> > > > even use a keyed state backend. Maybe I missed some use-case. Can you
> > > > provide a use-case where we will have an operator with un-keyed
> inputs
> > > > whose state size can explode due to we holding back watermarks?
> > > >
> > > > For operators with keyed inputs that use sorting, I suppose it is
> > > possible
> > > > that sorting + BatchExecutionKeyedStateBackend can be worse than
> using
> > > > RocksDB. But I believe this is very very rare (if possible) in almost
> > > > practical usage of Flink.
> > > >
> > > > Take one step back, if this indeed cause regression for a real
> > use-case,
> > > > user can set execution.checkpointing.interval-during-backlog to
> > anything
> > > > other than 0 so that this FLIP will not use
> > > > sorter + BatchExecutionKeyedStateBackend even even when there is
> > backlog.
> > > >
> > > > I would hope we can find a way to automatically determine whether
> using
> > > > sorting + BatchExecutionKeyedStateBackend can be better or worse than
> > > using
> > > > RocksDB alone. But I could not find a good and reliable way to do
> this.
> > > > Maybe we can update Flink to do this when we find a good way to do
> this
> > > in
> > > > the future?
> > > >
> > > >
> > > >
> > > > > - Keyed state backend
> > > > >     - I think you haven't described what happens during switching
> > from
> > > > > streaming to backlog processing.
> > > > >
> > > >
> > > > Good point. This indeed needs to be described. I added a TODO in the
> > > > "Behavior changes ..." section to describe what happens when
> isBacklog
> > > > switches from false to true, for all
> watermark/checkpoint/statebackend
> > > etc.
> > > >
> > > > Let me explain this for the state backend here for now. I will update
> > > FLIP
> > > > later.
> > > >
> > > > When isBacklog switches from false to true, operator with keyed
> inputs
> > > can
> > > > optionally (as determined by its implementation) starts to use
> internal
> > > > sorter to sort inputs by key, without processing inputs or updating
> > > > statebackend, until it receives end-of-inputs or isBacklog is
> switched
> > to
> > > > false again.
> > > >
> > > >
> > > >
> > > > >     - Switch can be an unnecessary overhead.
> > > >
> > > >
> > > > I agree it can cause unnecessary overhead, particularly when
> isBacklog
> > > > switches back and forth frequently. Whether or not this is
> unnecessary
> > > > likely depends on the duration/throughput of the backlog phase as
> well
> > as
> > > > the specific computation logic of the operator. I am not sure there
> is
> > a
> > > > good way for Flink to determine in advance whether switching is
> > > > unnecessary.
> > > >
> > > > Note that for the existing use-case where we expect to change
> isBacklog
> > > to
> > > > true (e.g. MySQL CDC snapshot phase, Kafka source watermark lag being
> > too
> > > > high), we don't expect the watermark to switch back and force
> > frequently.
> > > > And user can disable this switch by setting
> > > > execution.checkpointing.interval-during-backlog to anything other
> than
> > 0.
> > > >
> > > > Therefore, I am wondering if we can also view this as a performance
> > > > optimization opportunity for extra use-cases in the future, rather
> > than a
> > > > blocking issue of this FLIP for the MVP use-case (e.g. snapshot phase
> > for
> > > > any CDC source, Kafka watermark lag).
> > > >
> > > >
> > > > > At the same time, in your current proposal, for
> > > > > `execution.checkpointing.interval-during-backlog > 0` we won't
> > > > > switch to "batch" mode at all. That's a bit of shame, I don't
> > > understand
> > > > > why those two things should be coupled
> > > > > together?
> > > > >
> > > >
> > > > We can in general classify optimizations as those that are compatible
> > > with
> > > > checkpointing, and those that are not compatible with checkpointing.
> > For
> > > > example, input sorting is currently not compatible with
> checkpointing.
> > > And
> > > > buffering input records to reduce state backend overhead (and
> probably
> > > > columnar processing for mini-batch in the future) is compatible with
> > > > checkpointing.
> > > >
> > > > The primary of FLIP-327 is to support optimizations not compatible
> with
> > > > checkpointing. If execution.checkpointing.interval-during-backlog >
> 0,
> > > > which means that user intends to still do checkpointing even when
> there
> > > is
> > > > backog, then we will not be able to support such optimizations.
> > > >
> > > > For optimizations that are compatible with checkpointing, we can do
> > this
> > > > even when the operator does not run in "batch mode". There are extra
> > > > problems to solve in order to achieve this optimization, such as
> > > supporting
> > > > unaligned checkpointing without prolonging its sync phase. I plan to
> > > > explain how this can be done in FLIP-325.
> > > >
> > > >
> > > > > All in all, shouldn't we aim for some more clever process of
> > switching
> > > > back
> > > > > and forth between streaming/batch modes
> > > > > for watermark strategy/state backend/sorting based on some metrics?
> > > > Trying
> > > > > to either predict if switching might help,
> > > > > or trying to estimate if the last switch was beneficial? Maybe
> > > something
> > > > > along the lines:
> > > > > - sort only in memory and during sorting count the number of
> distinct
> > > > keys
> > > > > (NDK)
> > > > >     - maybe allow for spilling if so far in memory we have NDK * 5
> >=
> > > > > #records
> > > > > - do not allow to buffer records above a certain threshold, as
> > > otherwise
> > > > > checkpointing can explode
> > > > > - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >=
> > > #records
> > > > > - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records
> > > > >
> > > > > Or even maybe for starters something even simpler and then test out
> > > > > something more fancy as a follow up?
> > > > >
> > > >
> > > > I agree it is worth investigating these ideas to further optimize the
> > > > performance during backlog.
> > > >
> > > > I just think these can be done independently after this FLIP. The
> focus
> > > of
> > > > this FLIP is to re-use in stream mode the same optimization which we
> > > > already use in batch mode, rather than inventing or improving the
> > > > performance of these existing optimizations.
> > > >
> > > > Given that there are already a lot of new mechanism/features to
> discuss
> > > and
> > > > address in this FLIP, I am hoping we can limit the scope of this FLIP
> > to
> > > > re-use the existing optimization, and do these extra optimization
> > > > opportunities as future work.
> > > >
> > > > What do you think?
> > > >
> > > >
> > > > >
> > > > > At the same time,
> `execution.checkpointing.interval-during-backlog=0`
> > > > seems
> > > > > a weird setting to me, that I would
> > > > > not feel safe recommending to anyone. If processing of a backlog
> > takes
> > > a
> > > > > long time, a job might stop making
> > > > > any progress due to some random failures. Especially dangerous if a
> > job
> > > >
> > > > switches from streaming mode back to
> > > > > backlog processing due to some reasons, as that could happen months
> > > after
> > > > > someone started a job with this
> > > > > strange setting. So should we even have it? I would simply disallow
> > > it. I
> > > > >
> > > >
> > > > Good point. I do agree we need to further work to improve the
> failover
> > > > performance in case any task fails.
> > > >
> > > > As of the current FLIP, if any task fails during backlog and
> > > > execution.checkpointing.interval-during-backlog = 0, we will need to
> > > > restart all operators to the last checkpointed state and continue
> > > > processing backlog. And this can be a lot of rollback since there is
> no
> > > > checkpoint during backlog. And this can also be worse than batch
> since
> > > this
> > > > FLIP currently does not support exporting/saving records to local
> disk
> > > (or
> > > > shuffle service) so that a failed task can re-consume the records
> from
> > > the
> > > > upstream task (or shuffle service) in the same way as how Flink
> > failover
> > > a
> > > > task in batch mode.
> > > >
> > > > I think we can extend this FLIP to solve this problem so that it can
> > have
> > > > at least the same behavior/performance as batch-mode job. The idea is
> > to
> > > > also follow what batch mode does. For example, we can trigger a
> > > checkpoint
> > > > when isBacklog switches to true, and every operator should buffer its
> > > > output in the TM local disk (or remote shuffle service). Therefore,
> > > after a
> > > > task fails, it can restart from the last checkpoint and re-consume
> data
> > > > buffered in the upstream task.
> > > >
> > > > I will update FLIP as described above. Would this address your
> concern?
> > > >
> > > >
> > > >
> > > > > could see a power setting like:
> > > > >         `execution.backlog.use-full-batch-mode-on-start (default
> > > false)`
> > > > >
> > > >
> > > > I am not sure I fully understand this config or its motivation. Can
> you
> > > > help explain the exact semantics of this config?
> > > >
> > > >
> > > > > that would override any heuristic of switching to backlog if
> someone
> > is
> > > > > submitting a new job that starts with
> > > > > `isBacklog=true`.
> > > > >
> > > > > Or we could limit the scope of this FLIP to only support starting
> > with
> > > > > batch mode and switching only once to
> > > > > streaming, and design a follow up with switching back and forth?
> > > > >
> > > >
> > > > Sure, that sounds good to me. I am happy to split this FLIP into two
> > > FLIPs
> > > > so that we can make incremental progress.
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > >
> > > > > I'm looking forwards to hearing/reading out your thoughts.
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > >
> > > > > śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid>
> > > > napisał(a):
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > > Thanks for your reply!
> > > > > >
> > > > > > Best regards,
> > > > > > Jing
> > > > > >
> > > > > > On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Jing,
> > > > > > >
> > > > > > > Thanks for the comments. Please see my reply inline.
> > > > > > >
> > > > > > > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
> > <j...@ververica.com.invalid
> > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Dong,
> > > > > > > >
> > > > > > > > Thanks for the clarification. Now it is clear for me. I got
> > > > > additional
> > > > > > > noob
> > > > > > > > questions wrt the internal sorter.
> > > > > > > >
> > > > > > > > 1. when to call setter to set the internalSorterSupported to
> be
> > > > true?
> > > > > > > >
> > > > > > >
> > > > > > > Developer of the operator class (i.e. those classes which
> > > implements
> > > > > > > `StreamOperator`) should override the
> `#getOperatorAttributes()`
> > > API
> > > > to
> > > > > > set
> > > > > > > internalSorterSupported to true, if he/she decides to sort
> > records
> > > > > > > internally in the operator.
> > > > > > >
> > > > > > >
> > > > > > > > 2
> > > > > > > > *"For those operators whose throughput can be considerably
> > > improved
> > > > > > with
> > > > > > > an
> > > > > > > > internal sorter, update it to take advantage of the internal
> > > sorter
> > > > > > when
> > > > > > > > its input has isBacklog=true.*
> > > > > > > > *Typically, operators that involve aggregation operation
> (e.g.
> > > > join,
> > > > > > > > cogroup, aggregate) on keyed inputs can benefit from using an
> > > > > internal
> > > > > > > > sorter."*
> > > > > > > >
> > > > > > > > *"The operator that performs CoGroup operation will
> instantiate
> > > two
> > > > > > > > internal sorter to sorts records from its two inputs
> > separately.
> > > > Then
> > > > > > it
> > > > > > > > can pull the sorted records from these two sorters. This can
> be
> > > > done
> > > > > > > > without wrapping input records with TaggedUnion<...>. In
> > > > comparison,
> > > > > > the
> > > > > > > > existing DataStream#coGroup needs to wrap input records with
> > > > > > > > TaggedUnion<...> before sorting them using one external
> sorter,
> > > > which
> > > > > > > > introduces higher overhead."*
> > > > > > > >
> > > > > > > > According to the performance test, it seems that internal
> > sorter
> > > > has
> > > > > > > better
> > > > > > > > performance than external sorter. Is it possible to make
> those
> > > > > > operators
> > > > > > > > that can benefit from it use internal sorter by default?
> > > > > > > >
> > > > > > >
> > > > > > > Yes, it is possible. After this FLIP is done, users can use
> > > > > > > DataStream#coGroup with EndOfStreamWindows as the window
> assigner
> > > to
> > > > > > > co-group two streams in effectively the batch manner. An
> operator
> > > > that
> > > > > > uses
> > > > > > > an internal sorter will be used to perform the co-group
> > operation.
> > > > > There
> > > > > > is
> > > > > > > no need for users of the DataStream API to explicitly know or
> set
> > > the
> > > > > > > internal sorter in anyway.
> > > > > > >
> > > > > > > In the future, we plan to incrementally optimize other
> > aggregation
> > > > > > > operation (e.g. aggregate) on the DataStream API when
> > > > > EndOfStreamWindows
> > > > > > is
> > > > > > > used as the window assigner.
> > > > > > >
> > > > > > > Best,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Jing
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jing,
> > > > > > > > >
> > > > > > > > > Thank you for the comments! Please see my reply inline.
> > > > > > > > >
> > > > > > > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
> > > > <j...@ververica.com.invalid
> > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Dong,
> > > > > > > > > >
> > > > > > > > > > Thanks for the proposal! The FLIP is already in good
> > shape. I
> > > > got
> > > > > > > some
> > > > > > > > > NIT
> > > > > > > > > > questions.
> > > > > > > > > >
> > > > > > > > > > 1. It is a little bit weird to write the hint right after
> > the
> > > > > > > > motivation
> > > > > > > > > > that some features have been moved to FLIP-331, because
> at
> > > that
> > > > > > time,
> > > > > > > > > > readers don't know the context about what features does
> it
> > > > mean.
> > > > > I
> > > > > > > > would
> > > > > > > > > > suggest moving the note to the beginning of "Public
> > > interfaces"
> > > > > > > > sections.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Given that the reviewer who commented on this email thread
> > > > before I
> > > > > > > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I think
> it
> > > is
> > > > > > > simpler
> > > > > > > > to
> > > > > > > > > just remove any mention of FLIP-331. I have updated the
> FLIP
> > > > > > > accordingly.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 2. It is also a little bit weird to describe all
> behaviour
> > > > > changes
> > > > > > at
> > > > > > > > > first
> > > > > > > > > > but only focus on one single feature, i.e. how to
> implement
> > > > > > > > > > internalSorterSupported. TBH, I was lost while I was
> > reading
> > > > the
> > > > > > > Public
> > > > > > > > > > interfaces. Maybe change the FLIP title? Another option
> > could
> > > > be
> > > > > to
> > > > > > > > > write a
> > > > > > > > > > short summary of all features and point out that this
> FLIP
> > > will
> > > > > > only
> > > > > > > > > focus
> > > > > > > > > > on the internalSorterSupported feature. Others could be
> > found
> > > > in
> > > > > > > > > FLIP-331.
> > > > > > > > > > WDYT?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Conceptually, the purpose of this FLIP is to allow a stream
> > > mode
> > > > > job
> > > > > > to
> > > > > > > > run
> > > > > > > > > parts of the topology in batch mode so that it can apply
> > > > > > > > > optimizations/computations that can not be used together
> with
> > > > > > > > checkpointing
> > > > > > > > > (and thus not usable in stream mode). Although internal
> > sorter
> > > is
> > > > > the
> > > > > > > > only
> > > > > > > > > optimization immediately supported in this FLIP, this FLIP
> > lays
> > > > the
> > > > > > > > > foundation to support other optimizations in the future,
> such
> > > as
> > > > > > using
> > > > > > > > GPU
> > > > > > > > > to process a bounded stream of records.
> > > > > > > > >
> > > > > > > > > Therefore, I find it better to keep the current title
> rather
> > > than
> > > > > > > > limiting
> > > > > > > > > the scope to internal sorter. What do you think?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 3. There should be a typo at 4) Checkpoint and failover
> > > > strategy
> > > > > ->
> > > > > > > > Mixed
> > > > > > > > > > mode ->
> > > > > > > > > >
> > > > > > > > > >    - If any task fails when isBacklog=false true, this
> task
> > > is
> > > > > > > > restarted
> > > > > > > > > to
> > > > > > > > > >    re-process its input from the beginning.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > Thank you for catching this issue. It is fixed now.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Dong
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best regards
> > > > > > > > > > Jing
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Piotr,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your comments! Please see my reply inline.
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > > > > > > > piotr.nowoj...@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > >
> > > > > > > > > > > > I have a couple of questions.
> > > > > > > > > > > >
> > > > > > > > > > > > Could you explain why those properties
> > > > > > > > > > > >
> > > > > > > > > > > >     @Nullable private Boolean isOutputOnEOF = null;
> > > > > > > > > > > >     @Nullable private Boolean isOutputOnCheckpoint =
> > > null;
> > > > > > > > > > > >     @Nullable private Boolean
> > isInternalSorterSupported =
> > > > > null;
> > > > > > > > > > > >
> > > > > > > > > > > > must be `@Nullable`, instead of having the default
> > value
> > > > set
> > > > > to
> > > > > > > > > > `false`?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > By initializing these private variables in
> > > > > > > OperatorAttributesBuilder
> > > > > > > > as
> > > > > > > > > > > null, we can implement
> > `OperatorAttributesBuilder#build()`
> > > in
> > > > > > such
> > > > > > > a
> > > > > > > > > way
> > > > > > > > > > > that it can print DEBUG level logging to say
> > > > > > "isOutputOnCheckpoint
> > > > > > > is
> > > > > > > > > not
> > > > > > > > > > > explicitly set". This can help user/SRE debug
> performance
> > > > > issues
> > > > > > > (or
> > > > > > > > > lack
> > > > > > > > > > > of the expected optimization) due to operators not
> > > explicitly
> > > > > > > setting
> > > > > > > > > the
> > > > > > > > > > > right operator attribute.
> > > > > > > > > > >
> > > > > > > > > > > For example, we might want a job to always use the
> longer
> > > > > > > > checkpointing
> > > > > > > > > > > interval (i.e.
> > > > execution.checkpointing.interval-during-backlog)
> > > > > > if
> > > > > > > > all
> > > > > > > > > > > running operators have isOutputOnCheckpoint==false, and
> > use
> > > > the
> > > > > > > short
> > > > > > > > > > > checkpointing interval otherwise. If a user has
> > explicitly
> > > > > > > configured
> > > > > > > > > the
> > > > > > > > > > > execution.checkpointing.interval-during-backlog but the
> > > > > two-phase
> > > > > > > > > commit
> > > > > > > > > > > sink library has not been upgraded to set
> > > > > > > isOutputOnCheckpoint=true,
> > > > > > > > > then
> > > > > > > > > > > the job will end up using the long checkpointing
> > interval,
> > > > and
> > > > > it
> > > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > useful to figure out what is going wrong in this case
> by
> > > > > checking
> > > > > > > the
> > > > > > > > > > log.
> > > > > > > > > > >
> > > > > > > > > > > Note that the default value of these fields of the
> > > > > > > OperatorAttributes
> > > > > > > > > > > instance built by OperatorAttributesBuilder will still
> be
> > > > > false.
> > > > > > > The
> > > > > > > > > > > following is mentioned in the Java doc of
> > > > > > > > > > > `OperatorAttributesBuilder#build()`:
> > > > > > > > > > >
> > > > > > > > > > >  /**
> > > > > > > > > > >   * If any operator attribute is null, we will log it
> at
> > > > DEBUG
> > > > > > > level
> > > > > > > > > and
> > > > > > > > > > > use the following
> > > > > > > > > > >   * default values.
> > > > > > > > > > >   * - isOutputOnEOF defaults to false
> > > > > > > > > > >   * - isOutputOnCheckpoint defaults to false
> > > > > > > > > > >   * - isInternalSorterSupported defaults to false
> > > > > > > > > > >   */
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Second question, have you thought about cases where
> > > someone
> > > > > is
> > > > > > > > > > > > either bootstrapping from a streaming source like
> Kafka
> > > > > > > > > > > > or simply trying to catch up after a long period of
> > > > downtime
> > > > > > in a
> > > > > > > > > > purely
> > > > > > > > > > > > streaming job? Generally speaking a cases where
> > > > > > > > > > > > user doesn't care about latency in the catch up
> phase,
> > > > > > regardless
> > > > > > > > if
> > > > > > > > > > the
> > > > > > > > > > > > source is bounded or unbounded, but wants to process
> > > > > > > > > > > > the data as fast as possible, and then switch
> > dynamically
> > > > to
> > > > > > real
> > > > > > > > > time
> > > > > > > > > > > > processing?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Yes, I have thought about this. We should allow this
> job
> > to
> > > > > > > > effectively
> > > > > > > > > > run
> > > > > > > > > > > in batch mode when the job is in the catch-up phase.
> > > FLIP-327
> > > > > is
> > > > > > > > > actually
> > > > > > > > > > > an important step toward addressing this use-case.
> > > > > > > > > > >
> > > > > > > > > > > In order to address the above use-case, all we need is
> a
> > > way
> > > > > for
> > > > > > > > source
> > > > > > > > > > > operator (e.g. Kafka) to tell Flink runtime (via
> > > > > > > IsProcessingBacklog)
> > > > > > > > > > > whether it is in the catch-up phase.
> > > > > > > > > > >
> > > > > > > > > > > Since every Kafka message has event-timestamp, we can
> > allow
> > > > > users
> > > > > > > to
> > > > > > > > > > > specify a job-level config such as
> > > > > > backlog-watermark-lag-threshold,
> > > > > > > > and
> > > > > > > > > > > consider a Kafka Source to have
> IsProcessingBacklog=true
> > if
> > > > > > > > > system_time -
> > > > > > > > > > > watermark > backlog-watermark-lag-threshold. This
> > > effectively
> > > > > > > allows
> > > > > > > > us
> > > > > > > > > > to
> > > > > > > > > > > determine whether Kafka is in the catch up phase.
> > > > > > > > > > >
> > > > > > > > > > > Once we have this capability (I plan to work on this in
> > > > > > FLIP-328),
> > > > > > > we
> > > > > > > > > can
> > > > > > > > > > > directly use the features proposed in FLIP-325 and
> > FLIP-327
> > > > to
> > > > > > > > optimize
> > > > > > > > > > the
> > > > > > > > > > > above use-case.
> > > > > > > > > > >
> > > > > > > > > > > What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Dong
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Piotrek
> > > > > > > > > > > >
> > > > > > > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > > > > napisał(a):
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am opening this thread to discuss FLIP-327:
> Support
> > > > > > > > stream-batch
> > > > > > > > > > > > unified
> > > > > > > > > > > > > operator to improve job throughput when processing
> > > > backlog
> > > > > > > data.
> > > > > > > > > The
> > > > > > > > > > > > design
> > > > > > > > > > > > > doc can be found at
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> > > > > > > > > > > > > .
> > > > > > > > > > > > >
> > > > > > > > > > > > > This FLIP enables a Flink job to initially operate
> in
> > > > batch
> > > > > > > mode,
> > > > > > > > > > > > achieving
> > > > > > > > > > > > > high throughput while processing records that do
> not
> > > > > require
> > > > > > > low
> > > > > > > > > > > > processing
> > > > > > > > > > > > > latency. Subsequently, the job can seamlessly
> > > transition
> > > > to
> > > > > > > > stream
> > > > > > > > > > mode
> > > > > > > > > > > > for
> > > > > > > > > > > > > processing real-time records with low latency.
> > > > Importantly,
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > > > state
> > > > > > > > > > > > > can be utilized before and after this mode switch,
> > > making
> > > > > it
> > > > > > > > > > > particularly
> > > > > > > > > > > > > valuable when users wish to bootstrap the job's
> state
> > > > using
> > > > > > > > > > historical
> > > > > > > > > > > > > data.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We would greatly appreciate any comments or
> feedback
> > > you
> > > > > may
> > > > > > > have
> > > > > > > > > on
> > > > > > > > > > > this
> > > > > > > > > > > > > proposal.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > Dong
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to