Hi Dong,

Thanks for the proposal! The FLIP is already in good shape. I got some NIT
questions.

1. It is a little bit weird to write the hint right after the motivation
that some features have been moved to FLIP-331, because at that time,
readers don't know the context about what features does it mean. I would
suggest moving the note to the beginning of "Public interfaces" sections.
2. It is also a little bit weird to describe all behaviour changes at first
but only focus on one single feature, i.e. how to implement
internalSorterSupported. TBH, I was lost while I was reading the Public
interfaces. Maybe change the FLIP title? Another option could be to write a
short summary of all features and point out that this FLIP will only focus
on the internalSorterSupported feature. Others could be found in FLIP-331.
WDYT?
3. There should be a typo at 4) Checkpoint and failover strategy -> Mixed
mode ->

   - If any task fails when isBacklog=false true, this task is restarted to
   re-process its input from the beginning.



Best regards
Jing


On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <lindon...@gmail.com> wrote:

> Hi Piotr,
>
> Thanks for your comments! Please see my reply inline.
>
> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
> > Hi Dong,
> >
> > I have a couple of questions.
> >
> > Could you explain why those properties
> >
> >     @Nullable private Boolean isOutputOnEOF = null;
> >     @Nullable private Boolean isOutputOnCheckpoint = null;
> >     @Nullable private Boolean isInternalSorterSupported = null;
> >
> > must be `@Nullable`, instead of having the default value set to `false`?
> >
>
> By initializing these private variables in OperatorAttributesBuilder as
> null, we can implement `OperatorAttributesBuilder#build()` in such a way
> that it can print DEBUG level logging to say "isOutputOnCheckpoint is not
> explicitly set". This can help user/SRE debug performance issues (or lack
> of the expected optimization) due to operators not explicitly setting the
> right operator attribute.
>
> For example, we might want a job to always use the longer checkpointing
> interval (i.e. execution.checkpointing.interval-during-backlog) if all
> running operators have isOutputOnCheckpoint==false, and use the short
> checkpointing interval otherwise. If a user has explicitly configured the
> execution.checkpointing.interval-during-backlog but the two-phase commit
> sink library has not been upgraded to set isOutputOnCheckpoint=true, then
> the job will end up using the long checkpointing interval, and it will be
> useful to figure out what is going wrong in this case by checking the log.
>
> Note that the default value of these fields of the OperatorAttributes
> instance built by OperatorAttributesBuilder will still be false. The
> following is mentioned in the Java doc of
> `OperatorAttributesBuilder#build()`:
>
>  /**
>   * If any operator attribute is null, we will log it at DEBUG level and
> use the following
>   * default values.
>   * - isOutputOnEOF defaults to false
>   * - isOutputOnCheckpoint defaults to false
>   * - isInternalSorterSupported defaults to false
>   */
>
>
> >
> > Second question, have you thought about cases where someone is
> > either bootstrapping from a streaming source like Kafka
> > or simply trying to catch up after a long period of downtime in a purely
> > streaming job? Generally speaking a cases where
> > user doesn't care about latency in the catch up phase, regardless if the
> > source is bounded or unbounded, but wants to process
> > the data as fast as possible, and then switch dynamically to real time
> > processing?
> >
>
> Yes, I have thought about this. We should allow this job to effectively run
> in batch mode when the job is in the catch-up phase. FLIP-327 is actually
> an important step toward addressing this use-case.
>
> In order to address the above use-case, all we need is a way for source
> operator (e.g. Kafka) to tell Flink runtime (via IsProcessingBacklog)
> whether it is in the catch-up phase.
>
> Since every Kafka message has event-timestamp, we can allow users to
> specify a job-level config such as backlog-watermark-lag-threshold, and
> consider a Kafka Source to have IsProcessingBacklog=true if system_time -
> watermark > backlog-watermark-lag-threshold. This effectively allows us to
> determine whether Kafka is in the catch up phase.
>
> Once we have this capability (I plan to work on this in FLIP-328), we can
> directly use the features proposed in FLIP-325 and FLIP-327 to optimize the
> above use-case.
>
> What do you think?
>
> Best,
> Dong
>
>
> >
> > Best,
> > Piotrek
> >
> > niedz., 2 lip 2023 o 16:15 Dong Lin <lindon...@gmail.com> napisał(a):
> >
> > > Hi all,
> > >
> > > I am opening this thread to discuss FLIP-327: Support stream-batch
> > unified
> > > operator to improve job throughput when processing backlog data. The
> > design
> > > doc can be found at
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> > > .
> > >
> > > This FLIP enables a Flink job to initially operate in batch mode,
> > achieving
> > > high throughput while processing records that do not require low
> > processing
> > > latency. Subsequently, the job can seamlessly transition to stream mode
> > for
> > > processing real-time records with low latency. Importantly, the same
> > state
> > > can be utilized before and after this mode switch, making it
> particularly
> > > valuable when users wish to bootstrap the job's state using historical
> > > data.
> > >
> > > We would greatly appreciate any comments or feedback you may have on
> this
> > > proposal.
> > >
> > > Cheers,
> > > Dong
> > >
> >
>

Reply via email to