Hi Dong,

I have a couple of follow up questions about switching back and forth
between streaming and batching mode.
Especially around shuffle/watermark strategy, and keyed state backend.

First of all, it might not always be beneficial to switch into the batch
modes:
- Shuffle strategy
    - Is sorting going to be purely in-memory? If not, obviously spilling
to disks might cause larger overheads
       compared to not sorting the records.
    - If it will be at least partially in-memory, does Flink have some
mechanism to reserve optional memory that
      can be revoked if a new operator starts up? Can this memory be
redistributed? Ideally we should use as
      much as possible of the available memory to avoid spilling costs, but
also being able to revoke that memory
    - Sometimes sorting, even if we have memory to do that, might be an
unnecessary overhead.
- Watermarks
    - Is holding back watermarks always good? If we have tons of data
buffered/sorted and waiting to be processed
       with multiple windows per key and many different keys. When we
switch back to `isBacklog=false` we
       first process all of that data before processing watermarks, for
operators that are not using sorted input the
       state size can explode significantly causing lots of problems. Even
for those that can use sorting, switching to
       sorting or BatchExecutionKeyedStateBackend is not always a good
idea, but keeping RocksDB also can be
       risky.
- Keyed state backend
    - I think you haven't described what happens during switching from
streaming to backlog processing.
    - Switch can be an unnecessary overhead.

At the same time, in your current proposal, for
`execution.checkpointing.interval-during-backlog > 0` we won't
switch to "batch" mode at all. That's a bit of shame, I don't understand
why those two things should be coupled
together?

All in all, shouldn't we aim for some more clever process of switching back
and forth between streaming/batch modes
for watermark strategy/state backend/sorting based on some metrics? Trying
to either predict if switching might help,
or trying to estimate if the last switch was beneficial? Maybe something
along the lines:
- sort only in memory and during sorting count the number of distinct keys
(NDK)
    - maybe allow for spilling if so far in memory we have NDK * 5 >=
#records
- do not allow to buffer records above a certain threshold, as otherwise
checkpointing can explode
- switch to `BatchExecutionKeyedStateBackend` only if NDK * 2 >= #records
- do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records

Or even maybe for starters something even simpler and then test out
something more fancy as a follow up?

At the same time, `execution.checkpointing.interval-during-backlog=0` seems
a weird setting to me, that I would
not feel safe recommending to anyone. If processing of a backlog takes a
long time, a job might stop making
any progress due to some random failures. Especially dangerous if a job
switches from streaming mode back to
backlog processing due to some reasons, as that could happen months after
someone started a job with this
strange setting. So should we even have it? I would simply disallow it. I
could see a power setting like:
        `execution.backlog.use-full-batch-mode-on-start (default false)`
that would override any heuristic of switching to backlog if someone is
submitting a new job that starts with
`isBacklog=true`.

Or we could limit the scope of this FLIP to only support starting with
batch mode and switching only once to
streaming, and design a follow up with switching back and forth?

I'm looking forwards to hearing/reading out your thoughts.

Best,
Piotrek


śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid> napisał(a):

> Hi Dong,
>
> Thanks for your reply!
>
> Best regards,
> Jing
>
> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Jing,
> >
> > Thanks for the comments. Please see my reply inline.
> >
> > On Wed, Jul 12, 2023 at 5:04 AM Jing Ge <j...@ververica.com.invalid>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the clarification. Now it is clear for me. I got additional
> > noob
> > > questions wrt the internal sorter.
> > >
> > > 1. when to call setter to set the internalSorterSupported to be true?
> > >
> >
> > Developer of the operator class (i.e. those classes which implements
> > `StreamOperator`) should override the `#getOperatorAttributes()` API to
> set
> > internalSorterSupported to true, if he/she decides to sort records
> > internally in the operator.
> >
> >
> > > 2
> > > *"For those operators whose throughput can be considerably improved
> with
> > an
> > > internal sorter, update it to take advantage of the internal sorter
> when
> > > its input has isBacklog=true.*
> > > *Typically, operators that involve aggregation operation (e.g. join,
> > > cogroup, aggregate) on keyed inputs can benefit from using an internal
> > > sorter."*
> > >
> > > *"The operator that performs CoGroup operation will instantiate two
> > > internal sorter to sorts records from its two inputs separately. Then
> it
> > > can pull the sorted records from these two sorters. This can be done
> > > without wrapping input records with TaggedUnion<...>. In comparison,
> the
> > > existing DataStream#coGroup needs to wrap input records with
> > > TaggedUnion<...> before sorting them using one external sorter, which
> > > introduces higher overhead."*
> > >
> > > According to the performance test, it seems that internal sorter has
> > better
> > > performance than external sorter. Is it possible to make those
> operators
> > > that can benefit from it use internal sorter by default?
> > >
> >
> > Yes, it is possible. After this FLIP is done, users can use
> > DataStream#coGroup with EndOfStreamWindows as the window assigner to
> > co-group two streams in effectively the batch manner. An operator that
> uses
> > an internal sorter will be used to perform the co-group operation. There
> is
> > no need for users of the DataStream API to explicitly know or set the
> > internal sorter in anyway.
> >
> > In the future, we plan to incrementally optimize other aggregation
> > operation (e.g. aggregate) on the DataStream API when EndOfStreamWindows
> is
> > used as the window assigner.
> >
> > Best,
> > Dong
> >
> >
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hi Jing,
> > > >
> > > > Thank you for the comments! Please see my reply inline.
> > > >
> > > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge <j...@ververica.com.invalid>
> > > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for the proposal! The FLIP is already in good shape. I got
> > some
> > > > NIT
> > > > > questions.
> > > > >
> > > > > 1. It is a little bit weird to write the hint right after the
> > > motivation
> > > > > that some features have been moved to FLIP-331, because at that
> time,
> > > > > readers don't know the context about what features does it mean. I
> > > would
> > > > > suggest moving the note to the beginning of "Public interfaces"
> > > sections.
> > > > >
> > > >
> > > > Given that the reviewer who commented on this email thread before I
> > > > refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is
> > simpler
> > > to
> > > > just remove any mention of FLIP-331. I have updated the FLIP
> > accordingly.
> > > >
> > > >
> > > > > 2. It is also a little bit weird to describe all behaviour changes
> at
> > > > first
> > > > > but only focus on one single feature, i.e. how to implement
> > > > > internalSorterSupported. TBH, I was lost while I was reading the
> > Public
> > > > > interfaces. Maybe change the FLIP title? Another option could be to
> > > > write a
> > > > > short summary of all features and point out that this FLIP will
> only
> > > > focus
> > > > > on the internalSorterSupported feature. Others could be found in
> > > > FLIP-331.
> > > > > WDYT?
> > > > >
> > > >
> > > > Conceptually, the purpose of this FLIP is to allow a stream mode job
> to
> > > run
> > > > parts of the topology in batch mode so that it can apply
> > > > optimizations/computations that can not be used together with
> > > checkpointing
> > > > (and thus not usable in stream mode). Although internal sorter is the
> > > only
> > > > optimization immediately supported in this FLIP, this FLIP lays the
> > > > foundation to support other optimizations in the future, such as
> using
> > > GPU
> > > > to process a bounded stream of records.
> > > >
> > > > Therefore, I find it better to keep the current title rather than
> > > limiting
> > > > the scope to internal sorter. What do you think?
> > > >
> > > >
> > > >
> > > > > 3. There should be a typo at 4) Checkpoint and failover strategy ->
> > > Mixed
> > > > > mode ->
> > > > >
> > > > >    - If any task fails when isBacklog=false true, this task is
> > > restarted
> > > > to
> > > > >    re-process its input from the beginning.
> > > > >
> > > > >
> > > > Thank you for catching this issue. It is fixed now.
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > >
> > > > >
> > > > >
> > > > > Best regards
> > > > > Jing
> > > > >
> > > > >
> > > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Piotr,
> > > > > >
> > > > > > Thanks for your comments! Please see my reply inline.
> > > > > >
> > > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Dong,
> > > > > > >
> > > > > > > I have a couple of questions.
> > > > > > >
> > > > > > > Could you explain why those properties
> > > > > > >
> > > > > > >     @Nullable private Boolean isOutputOnEOF = null;
> > > > > > >     @Nullable private Boolean isOutputOnCheckpoint = null;
> > > > > > >     @Nullable private Boolean isInternalSorterSupported = null;
> > > > > > >
> > > > > > > must be `@Nullable`, instead of having the default value set to
> > > > > `false`?
> > > > > > >
> > > > > >
> > > > > > By initializing these private variables in
> > OperatorAttributesBuilder
> > > as
> > > > > > null, we can implement `OperatorAttributesBuilder#build()` in
> such
> > a
> > > > way
> > > > > > that it can print DEBUG level logging to say
> "isOutputOnCheckpoint
> > is
> > > > not
> > > > > > explicitly set". This can help user/SRE debug performance issues
> > (or
> > > > lack
> > > > > > of the expected optimization) due to operators not explicitly
> > setting
> > > > the
> > > > > > right operator attribute.
> > > > > >
> > > > > > For example, we might want a job to always use the longer
> > > checkpointing
> > > > > > interval (i.e. execution.checkpointing.interval-during-backlog)
> if
> > > all
> > > > > > running operators have isOutputOnCheckpoint==false, and use the
> > short
> > > > > > checkpointing interval otherwise. If a user has explicitly
> > configured
> > > > the
> > > > > > execution.checkpointing.interval-during-backlog but the two-phase
> > > > commit
> > > > > > sink library has not been upgraded to set
> > isOutputOnCheckpoint=true,
> > > > then
> > > > > > the job will end up using the long checkpointing interval, and it
> > > will
> > > > be
> > > > > > useful to figure out what is going wrong in this case by checking
> > the
> > > > > log.
> > > > > >
> > > > > > Note that the default value of these fields of the
> > OperatorAttributes
> > > > > > instance built by OperatorAttributesBuilder will still be false.
> > The
> > > > > > following is mentioned in the Java doc of
> > > > > > `OperatorAttributesBuilder#build()`:
> > > > > >
> > > > > >  /**
> > > > > >   * If any operator attribute is null, we will log it at DEBUG
> > level
> > > > and
> > > > > > use the following
> > > > > >   * default values.
> > > > > >   * - isOutputOnEOF defaults to false
> > > > > >   * - isOutputOnCheckpoint defaults to false
> > > > > >   * - isInternalSorterSupported defaults to false
> > > > > >   */
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Second question, have you thought about cases where someone is
> > > > > > > either bootstrapping from a streaming source like Kafka
> > > > > > > or simply trying to catch up after a long period of downtime
> in a
> > > > > purely
> > > > > > > streaming job? Generally speaking a cases where
> > > > > > > user doesn't care about latency in the catch up phase,
> regardless
> > > if
> > > > > the
> > > > > > > source is bounded or unbounded, but wants to process
> > > > > > > the data as fast as possible, and then switch dynamically to
> real
> > > > time
> > > > > > > processing?
> > > > > > >
> > > > > >
> > > > > > Yes, I have thought about this. We should allow this job to
> > > effectively
> > > > > run
> > > > > > in batch mode when the job is in the catch-up phase. FLIP-327 is
> > > > actually
> > > > > > an important step toward addressing this use-case.
> > > > > >
> > > > > > In order to address the above use-case, all we need is a way for
> > > source
> > > > > > operator (e.g. Kafka) to tell Flink runtime (via
> > IsProcessingBacklog)
> > > > > > whether it is in the catch-up phase.
> > > > > >
> > > > > > Since every Kafka message has event-timestamp, we can allow users
> > to
> > > > > > specify a job-level config such as
> backlog-watermark-lag-threshold,
> > > and
> > > > > > consider a Kafka Source to have IsProcessingBacklog=true if
> > > > system_time -
> > > > > > watermark > backlog-watermark-lag-threshold. This effectively
> > allows
> > > us
> > > > > to
> > > > > > determine whether Kafka is in the catch up phase.
> > > > > >
> > > > > > Once we have this capability (I plan to work on this in
> FLIP-328),
> > we
> > > > can
> > > > > > directly use the features proposed in FLIP-325 and FLIP-327 to
> > > optimize
> > > > > the
> > > > > > above use-case.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Best,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek
> > > > > > >
> > > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <lindon...@gmail.com>
> > > > napisał(a):
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I am opening this thread to discuss FLIP-327: Support
> > > stream-batch
> > > > > > > unified
> > > > > > > > operator to improve job throughput when processing backlog
> > data.
> > > > The
> > > > > > > design
> > > > > > > > doc can be found at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> > > > > > > > .
> > > > > > > >
> > > > > > > > This FLIP enables a Flink job to initially operate in batch
> > mode,
> > > > > > > achieving
> > > > > > > > high throughput while processing records that do not require
> > low
> > > > > > > processing
> > > > > > > > latency. Subsequently, the job can seamlessly transition to
> > > stream
> > > > > mode
> > > > > > > for
> > > > > > > > processing real-time records with low latency. Importantly,
> the
> > > > same
> > > > > > > state
> > > > > > > > can be utilized before and after this mode switch, making it
> > > > > > particularly
> > > > > > > > valuable when users wish to bootstrap the job's state using
> > > > > historical
> > > > > > > > data.
> > > > > > > >
> > > > > > > > We would greatly appreciate any comments or feedback you may
> > have
> > > > on
> > > > > > this
> > > > > > > > proposal.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Dong
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to