Thanks for all of the updates! I'm looking forward to trying out those
improvements.

Best,
Piotrek

pon., 18 wrz 2023 o 09:33 Xintong Song <tonysong...@gmail.com> napisał(a):

> Thanks for addressing my comments.
>
> LGTM
>
> Best,
>
> Xintong
>
>
>
> On Mon, Sep 18, 2023 at 3:10 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Xintong,
> >
> > Thank you for all the comments. Please see my reply inline.
> >
> >
> > On Mon, Sep 18, 2023 at 11:31 AM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> > > Thanks for addressing my comments, Dong.
> > >
> > > The expected behavior of checkpointing and failover depends on whether
> > > > there is any operator currently running in the job with all its
> inputs'
> > > > isBacklog=true. If there exists such an operator and
> > > > interval-during-backlog = 0, then checkpoint will be disabled and the
> > > > operator will have to failover in a way similar to batch mode.
> > >
> > >
> > > This makes sense to me. Shall we also put this into the FLIP. Or maybe
> > you
> > > already did that and I overlooked it? The current description in "4)
> > > Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to
> me.
> > > It says "At the point when isBacklog switches to false, source operator
> > > ...", which sounds like upon any source operator switching to
> isBacklog =
> > > false.
> > >
> >
> >
> > I think it is kind of mentioned in the doc
> > of execution.checkpointing.interval-during-backlog, which says "if it is
> > not null and any source reports isProcessingBacklog=true, it is the
> > interval...".
> >
> > Based on this doc, we can derive that if there is one operator reporting
> > isBacklog=true, then the checkpointing interval is determined by
> > interval-during-backlog, which in this case has value 0 indicating that
> > checkpoint triggering is disabled.
> >
> > Given that other readers might also have this question, I have updated
> the
> > FLIP-327 with the following statement to make it more explicit: "For jobs
> > with multiple sources and
> execution.checkpointing.interval-during-backlog =
> > 0, checkpoint triggering is enabled if and only if all sources have
> > isBacklog=false".
> >
> >
> >
> > > I am not sure what is the concern with having `flink-streaming-java`
> > depend
> > > > on `flink-runtime`. Can you clarify the exact concern?
> > > >
> > >
> > > The concern here is that an API module should not depend on a runtime
> > > module. Currently, we have the "user codes -> flink-streaming-java ->
> > > flink-runtime" dependency chain, which makes binary compatibility
> > > impossible because any runtime changes can break the compatibility
> with a
> > > user jar (which bundles flink-streaming-java) compiled for an older
> > > version. Ideally, we want the runtime module to depend on the API
> module,
> > > rather than the other way around. This is one of the issues we are
> trying
> > > to resolve with the programmatic API refactor. However, the way we are
> > > trying to resolve it is to introduce another API module and gradually
> > > replace the current DataStream API / flink-streaming-java, which means
> > > flink-streaming-java will stay depending on flink-runtime for a while
> > > anyway. So the concern here is minor, only about we might need more
> > effort
> > > when reworking this with the new API.
> > >
> >
> > Thanks for the detailed explanation. Given that we plan to avoid having
> > flink-streaming-java depend on flink-runtime, I agree it is preferred to
> > avoid introducing more dependencies like this.
> >
> > I have updated the FLIP to let RecordAttributes extend StreamElement.
> >
> > Best,
> > Dong
> >
> >
> > > The rest of your replies make sense to me.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Fri, Sep 15, 2023 at 10:05 PM Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hi Xintong,
> > > >
> > > > Thanks for your comments! Please see my reply inline.
> > > >
> > > > On Thu, Sep 14, 2023 at 4:58 PM Xintong Song <tonysong...@gmail.com>
> > > > wrote:
> > > >
> > > > > Sorry to join the discussion late.
> > > > >
> > > > > Overall, I think it's a good idea to support dynamically switching
> > the
> > > > > operator algorithms between Streaming (optimized towards low
> latency
> > +
> > > > > checkpointing supports) and Batch (optimized towards throughput).
> > This
> > > is
> > > > > indeed a big and complex topic, and I really appreciate the
> previous
> > > > > discussions that narrow the scope of this FLIP down to only
> > considering
> > > > > switching from Batch to Streaming as a first step.
> > > > >
> > > > > I have several questions.
> > > > >
> > > > > 1. The FLIP discusses various behaviors under 4 scenarios:
> streaming
> > > > mode,
> > > > > batch mode, mixed mode with checkpoint interval > 0, mixed mode
> with
> > > > > checkpoint interval = 0. IIUC, this is because many batch
> > optimizations
> > > > > cannot be supported together with checkpointing. This justifies
> that
> > in
> > > > > mixed mode with interval > 0, most behaviors are the same as in
> > > streaming
> > > > > mode. However, mixed mode with checkpoint interval = 0 does not
> > always
> > > > > necessarily mean we should apply such optimization. It is possible
> > that
> > > > in
> > > > > some cases (likely with small data amounts) the cost of such
> > > > optimizations
> > > > > are higher than the benefit. Therefore, I'd suggest decoupling the
> > > > concept
> > > > > of applying these optimizations (i.e., the batch execution phase in
> > the
> > > > > mixed mode) from whether checkpointing is enabled or not. In
> > > particular,
> > > > > I'd suggest removing the scenario "mixed mode with
> > > > > e.c.interval-during-backlog > 0", changing the scenario "mixed mode
> > > with
> > > > > e.c.interval-during-backlog = 0" to simply "mixed mode", and say
> that
> > > can
> > > > > have different strategies for deciding whether to enable the mixed
> > mode
> > > > and
> > > > > as the first step the strategy is to enable it when
> > > > > e.c.interval-during-backlog = 0.
> > > > >
> > > >
> > > > Thanks for the detailed explanation!
> > > >
> > > > I have updated the "Behavior changes when switching from batch mode
> to
> > > > stream mode" section with the following changes.
> > > >
> > > > 1) Remove the description of "mixed mode with
> interval-during-backlog >
> > > 0"
> > > > and add the statement saying that "after this FLIP, the behavior of
> > Flink
> > > > runtime with execution.runtime-mode = streaming AND
> > > > execution.checkpointing.interval-during-backlog > 0, will be same as
> > the
> > > > stream mode prior to this FLIP"
> > > >
> > > > 2) Add the statement saying that "Mixed mode refers to the behavior
> of
> > > > Flink runtime after this FLIP with execution.runtime-mode = streaming
> > AND
> > > > execution.checkpointing.interval-during-backlog = 0".
> > > >
> > > > 3) Add the statement saying that "It is possible for mixed mode to be
> > > > slower than stream mode, particularly when there is only small amount
> > of
> > > > input records and the overhead of buffering/sorting inputs out-weight
> > its
> > > > benefit. This is similar to how the merge join might be slower than
> > hash
> > > > join. This FLIP focuses on optimizing the Flink throughput when there
> > is
> > > a
> > > > high number of input records. In the future, we might introduce more
> > > > strategies to turn on mix mode in a smart way to avoid performance
> > > > regression".
> > > >
> > > > Would this address your concern?
> >
> > >
> > > >
> > > > >
> > > > > 2. According to the FLIP, before isBacklog = false, the timer
> service
> > > > only
> > > > > keeps timers for the current key. It also says upon the end of each
> > > key,
> > > > it
> > > > > fires timers of the key up to the last watermark. IIUC, that means
> > not
> > > > all
> > > > > timers are guaranteed to be fired. It is possible that some timers
> > are
> > > > left
> > > > > to be triggered after isBacklog switching to false. If the timer
> > > service
> > > > > only keeps timers for the current key, those not-fired timers may
> get
> > > > lost
> > > > > when switching to a new key.
> > > > >
> > > >
> > > > Thanks for catching this. You are right that all timers should be
> fired
> > > as
> > > > long as the corresponding firing condition (either processing-time or
> > > > event-time) is satisfied.
> > > >
> > > > I have updated the "Timer Service" part of the "Behavior changes when
> > > > switching from batch mode to stream mode" section accordingly. Can
> you
> > > see
> > > > if it addresses your concern?
> > > >
> > > >
> > > > >
> > > > > 3. Is it possible that some sources / operators in the job switch
> to
> > > > > isBacklog = false, while others are still isBacklog = true? In that
> > > case,
> > > > > what is the expected behavior for checkpointing and failover?
> > > > >
> > > >
> > > > Yes, it is possible. And in this case, Flink runtime will handle this
> > > > operator as if all the operator's inputs have isBacklog=false. In
> > > > particular, Flink runtime will not automatically sort inputs of this
> > > > operator.
> > > >
> > > > I added the following statement in the FLIP to clarify the behavior:
> > "For
> > > > an operator with 2+ inputs, where some inputs have isBacklog=true and
> > > some
> > > > other inputs have isBacklog=false, Flink runtime will handle this
> > > operator
> > > > as if all its inputs have isBacklog=false".
> > > >
> > > > The expected behavior of checkpointing and failover depends on
> whether
> > > > there is any operator currently running in the job with all its
> inputs'
> > > > isBacklog=true. If there exists such an operator
> > > > and interval-during-backlog = 0, then checkpoint will be disabled and
> > the
> > > > operator will have to failover in a way similar to batch mode.
> > > >
> > > >
> > > >
> > > > >
> > > > > 4. Do we require RecordAttributes to be properly handled by all
> > > > operators?
> > > > > Or do we consider it as hints that operators may benefit from
> looking
> > > > into
> > > > > it but should not run into any problems ignoring it? I'm asking
> > > because,
> > > > if
> > > > > they are required to be properly handled, we probably need a way to
> > > > enforce
> > > > > operators to deal with it.
> > `processRecordAttributes(RecordAttributes)`
> > > > > might not be a good fit because we don't know whether the operator
> > has
> > > > > looked into all necessary fields of `RecordAttributes`.
> > > > >
> > > >
> > > > As of this FLIP, we would not require RecordAttributes to be handled
> > any
> > > > operator in order to achieve correctness. So it is more like a hint.
> > More
> > > > specifically, the isBacklog attribute provides a hint for an operator
> > to
> > > > optionally delay the processing of its inputs if doing so can improve
> > its
> > > > throughput.
> > > >
> > > > I think it would be useful to avoid requiring operators to explicitly
> > > > handling attributes contained in RecordAttributes. This is because we
> > > want
> > > > the features added in this FLIP (and future FLIPs) to be backward
> > > > compatible without breaking the correctness of existing jobs.
> > > >
> > > > Suppose we really need to add a record attribute that should be
> > > explicitly
> > > > handled by every operator, I believe we can always find a way to
> > enforce
> > > > this requirement (e.g. fail job compilation with proper error) in the
> > > > future. For example, we can add a method such as
> > > handleXXXRecodAttribute()
> > > > in the operator interface without default implementation.
> > > >
> > > >
> > > > >
> > > > > 5. I wonder if there's any strong reasons behind choosing
> > > `RuntimeEvent`
> > > > > over `StreamElement` for `RecordAttributes` to extend? My concern
> is
> > > > that,
> > > > > the current approach introduces one more dependency from
> > > > > `flink-streaming-java` (operators that uses `RecordAttributes`) to
> > > > > `flink-runtime` (where `RuntimeEvent` comes from), which seems to
> be
> > > > > unnecessary.
> > > > >
> > > >
> > > > There is no strong reason to choose `RuntimeEvent` over
> > `StreamElement`.
> > > I
> > > > think the main (and minor) reason for doing so is the simplicity of
> > > > implementation. For example, we don't need to add methods such
> > > > as StreamElement#isRecordAttributes and
> > StreamElement#asRecordAttributes.
> > > >
> > > > I am not sure what is the concern with having `flink-streaming-java`
> > > depend
> > > > on `flink-runtime`. Can you clarify the exact concern?
> > > >
> > > > In any case, I don't have a strong preference between `RuntimeEvent`
> > over
> > > > `StreamElement`. We can update the FLIP to use `StreamElement` as
> long
> > as
> > > > there is a well-defined non-trivial reason for making this choice.
> > > >
> > > >
> > > > > 6. The FLIP says it leverages state backend optimizations
> introduced
> > in
> > > > > FLIP-325. Just for clarification, does this mean this FLIP is
> > depending
> > > > on
> > > > > FLIP-325, and probably should not be voted / accepted until
> FLIP-325
> > is
> > > > > accepted?
> > > > >
> > > >
> > > > Yes, the proposed implementation of this FLIP depends on FLIP-325. We
> > can
> > > > start voting thread for FLIP-327 after FLIP-325 is accepted. Maybe we
> > can
> > > > continue to discuss this FLIP in the mean time (before FLIP-325 is
> > > > accepted).
> > > >
> > > > Thanks again for the very detailed and helpful review! Looking
> forward
> > to
> > > > your follow-up comments.
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > >
> > > > > Best,
> > > > >
> > > > > Xintong
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Sep 1, 2023 at 12:48 AM Jan Lukavský <je...@seznam.cz>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > some keywords in this triggered my attention, so sorry for late
> > > jumping
> > > > > > in, but I'd like to comprehend the nature of the proposal.
> > > > > >
> > > > > > I'll try to summarize my understanding:
> > > > > >
> > > > > > The goal of the FLIP is to support automatic switching between
> > > > streaming
> > > > > > and batch processing, leveraging the fact that batch processing
> is
> > > more
> > > > > > computationally effective. This makes perfect sense.
> > > > > >
> > > > > > Looking at the streaming vs. batch semantics, switching from
> > > streaming
> > > > > > to batch means the following:
> > > > > >
> > > > > >   a) generally, watermarks are not propagated in batch, watermark
> > > moves
> > > > > > from -inf to +inf in one step, at the end of batch input, this
> > might
> > > > > > (and probably will) skip many invocations of timers
> > > > > >
> > > > > >   b) grouping by key (and window) can be done efficiently,
> because
> > it
> > > > > > can be done by sort-group and ideally parallelized by window
> (with
> > > some
> > > > > > caveats)
> > > > > >
> > > > > > The switch also has some conditions, namely:
> > > > > >
> > > > > >   i) batch mode does not do checkpoints, inputs must be
> accessible
> > > > > > repeatedly (forever)
> > > > > >
> > > > > >   ii) due to failures in batch mode, inputs might be reprocessed
> > and
> > > > > > thus must be immutable or all sub-results computed in all
> branches
> > of
> > > > > > the computation (even possibly unaffected by the failure) have to
> > be
> > > > > > discarded and recomputed from scratch
> > > > > >
> > > > > > Obviously, in case of the switch from batch to streaming, the
> > > property
> > > > > > a) has to be modified so the watermark does not move to +inf, but
> > to
> > > > > > min(streaming watermark). Giving these properties, it should be
> > > > possible
> > > > > > to exchange batch and streaming processing without any
> cooperation
> > > with
> > > > > > the application logic itself. Is my understanding correct?
> > > > > >
> > > > > > If so, there is still one open question to efficiency, though.
> The
> > > > > > streaming operator _might_ need sorting by timestamp (e.g.
> > processing
> > > > > > time-series data, or even sequential data). In that case simply
> > > > > > switching streaming semantics to batch processing does not yield
> > > > > > efficient processing, because the operator still needs to buffer
> > and
> > > > > > manually sort all the input data (batch data is always
> unordered).
> > On
> > > > > > the other hand, the batch runner already does sorting (for
> grouping
> > > by
> > > > > > key), so adding additional sorting criterion is very cheap. In
> > Apache
> > > > > > Beam, we introduced a property of a stateful PTransform (DoFn)
> > called
> > > > > > @RequiresTimeSortedInput [1], which can then be implemented
> > > efficiently
> > > > > > by batch engines.
> > > > > >
> > > > > > Does the FLIP somehow work with conditions i) and ii)? I can
> > imagine
> > > > for
> > > > > > instance that if data is read from say Kafka, then if backlog
> gets
> > > > > > sufficiently large, then even the batch processing can take
> > > substantial
> > > > > > time and if it fails after long processing, some of the original
> > data
> > > > > > might be already rolled out from Kafka topic.
> > > > > >
> > > > > > In the FLIP there are some proposed changes to sources to emit
> > > metadata
> > > > > > about if the records come from backlog. What is the driving line
> of
> > > > > > thoughts why this is needed? In my point of view, streaming
> engines
> > > are
> > > > > > _always_ processing backlog, the only question is "how delayed
> are
> > > the
> > > > > > currently processed events after HEAD", or more specifically in
> > this
> > > > > > case "how many elements can we expect to process if the source
> > would
> > > > > > immediately stop receiving more data?". This should be
> configurable
> > > > > > using simple option defining the difference between current
> > > > > > processing-time (JM) and watermark of the source, or am I missing
> > > > > > something?
> > > > > >
> > > > > > Thanks for clarification and all the best,
> > > > > >
> > > > > >   Jan
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> > > > > >
> > > > > > On 8/31/23 13:17, Xuannan Su wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to share some updates on FLIP-327. Dong and I have
> > > had a
> > > > > > > series of discussions and have made several refinements to the
> > > FLIP.
> > > > > > >
> > > > > > > The major change to the FLIP is to allow the input of the
> > one-input
> > > > > > > operator to be automatically sorted during backlog processing.
> > When
> > > > > > > combined with the state backend optimization introduced in
> > FLIP-325
> > > > > [1],
> > > > > > > all the keyed single-input operators can achieve similar
> > > performance
> > > > as
> > > > > > in
> > > > > > > batch mode during backlog processing without any code change to
> > the
> > > > > > > operator. We also implemented a POC[2] and conducted
> benchmark[3]
> > > > using
> > > > > > the
> > > > > > > KeyedStream#reduce operation. The benchmark results demonstrate
> > the
> > > > > > > performance gains that this FLIP can offer.
> > > > > > >
> > > > > > > I am looking forward to any comments or feedback you may have
> on
> > > this
> > > > > > FLIP.
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
> > > > > > > [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
> > > > > > > [3]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >> On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > > > >>
> > > > > > >> Hi Piotr,
> > > > > > >>
> > > > > > >> Thanks for the explanation.
> > > > > > >>
> > > > > > >> To recap our offline discussion, there is a concern regarding
> > the
> > > > > > >> capability to dynamically switch between stream and batch
> modes.
> > > > This
> > > > > > >> concern is around unforeseen behaviors such as bugs or
> > performance
> > > > > > >> regressions, which we might not yet be aware of yet. The
> reason
> > > for
> > > > > this
> > > > > > >> concern is that this feature involves a fundamental impact on
> > the
> > > > > Flink
> > > > > > >> runtime's behavior.
> > > > > > >>
> > > > > > >> Due to the above concern, I agree it is reasonable to annotate
> > > > related
> > > > > > > APIs
> > > > > > >> as experimental. This step would provide us with the
> flexibility
> > > to
> > > > > > modify
> > > > > > >> these APIs if issues arise in the future. This annotation also
> > > > serves
> > > > > > as a
> > > > > > >> note to users that this functionality might not perform well
> as
> > > > > > expected.
> > > > > > >>
> > > > > > >> Though I believe that we can ensure the reliability of this
> > > feature
> > > > > > > through
> > > > > > >> good design and code reviews, comprehensive unit tests, and
> > > thorough
> > > > > > >> integration testing, I agree that it is reasonable to be extra
> > > > > cautious
> > > > > > in
> > > > > > >> this case. Also, it should be OK to delay making these APIs as
> > > > > > >> non-experimental by 1-2 releases.
> > > > > > >>
> > > > > > >> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs
> in
> > > > these
> > > > > > docs
> > > > > > >> as experimental. Please let me know if you think any other API
> > > > should
> > > > > > also
> > > > > > >> be marked as experimental.
> > > > > > >>
> > > > > > >> Thanks!
> > > > > > >> Dong
> > > > > > >>
> > > > > > >> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski <
> > > > > > piotr.nowoj...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Hi Dong,
> > > > > > >>>
> > > > > > >>> Operators API is unfortunately also our public facing API
> and I
> > > > mean
> > > > > > the
> > > > > > >>> APIs that we will add there should also be marked
> > `@Experimental`
> > > > > IMO.
> > > > > > >>>
> > > > > > >>> The config options should also be marked as experimental
> (both
> > > > > > >>> annotated @Experimental and noted the same thing in the docs,
> > > > > > >>> if @Experimental annotation is not automatically mentioned in
> > the
> > > > > > docs).
> > > > > > >>>
> > > > > > >>>> Alternatively, how about we add a doc for
> > > > > > >>> checkpointing.interval-during-backlog explaining its
> > > impact/concern
> > > > > as
> > > > > > >>> discussed above?
> > > > > > >>>
> > > > > > >>> We should do this independently from marking the APIs/config
> > > > options
> > > > > as
> > > > > > >>> `@Experimental`
> > > > > > >>>
> > > > > > >>> Best,
> > > > > > >>> Piotrek
> > > > > > >>>
> > > > > > >>> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com>
> > > > napisał(a):
> > > > > > >>>
> > > > > > >>>> Hi Piotr,
> > > > > > >>>>
> > > > > > >>>> Thanks for the reply!
> > > > > > >>>>
> > > > > > >>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <
> > > > > > piotr.nowoj...@gmail.com
> > > > > > >>>>
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>>> Hi,
> > > > > > >>>>>
> > > > > > >>>>> Sorry for the long delay in responding!
> > > > > > >>>>>
> > > > > > >>>>>> Given that it is an optional feature that can be
> > > > > > >>>>>> turned off by users, it might be OK to just let users try
> it
> > > out
> > > > > and
> > > > > > >>> we
> > > > > > >>>>> can
> > > > > > >>>>>> fix performance issues once we detect any of them. What do
> > you
> > > > > > think?
> > > > > > >>>>> I think it's fine. It would be best to mark this feature as
> > > > > > >>> experimental,
> > > > > > >>>>> and
> > > > > > >>>>> we say that the config keys or the default values might
> > change
> > > in
> > > > > the
> > > > > > >>>>> future.
> > > > > > >>>>>
> > > > > > >>>> In general I agree we can mark APIs that determine "whether
> to
> > > > > enable
> > > > > > >>>> dynamic switching between stream/batch mode" as
> experimental.
> > > > > > >>>>
> > > > > > >>>> However, I am not sure we have such an API yet. The APIs
> added
> > > in
> > > > > this
> > > > > > >>> FLIP
> > > > > > >>>> are intended to be used by operator developers rather than
> end
> > > > > users.
> > > > > > > End
> > > > > > >>>> users can enable this capability by setting
> > > > > > >>>> execution.checkpointing.interval-during-backlog = Long.MAX
> and
> > > > uses
> > > > > a
> > > > > > >>>> source which might implicitly set backlog statu (e.g.
> > > > HybridSource).
> > > > > > So
> > > > > > >>>> execution.checkpointing.interval-during-backlog is the only
> > > > > > user-facing
> > > > > > >>>> APIs that can always control whether this feature can be
> used.
> > > > > > >>>>
> > > > > > >>>> However, execution.checkpointing.interval-during-backlog
> > itself
> > > is
> > > > > not
> > > > > > >>> tied
> > > > > > >>>> to FLIP-327.
> > > > > > >>>>
> > > > > > >>>> Do you mean we should set
> > checkpointing.interval-during-backlog
> > > as
> > > > > > >>>> experimental? Alternatively, how about we add a doc for
> > > > > > >>>> checkpointing.interval-during-backlog explaining its
> > > > impact/concern
> > > > > as
> > > > > > >>>> discussed above?
> > > > > > >>>>
> > > > > > >>>> Best,
> > > > > > >>>> Dong
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>>> Maybe we can revisit the need for such a config when we
> > > > > > >>>> introduce/discuss
> > > > > > >>>>>> the capability to switch backlog from false to true in the
> > > > future.
> > > > > > >>> What
> > > > > > >>>>> do
> > > > > > >>>>>> you think?
> > > > > > >>>>> Sure, we can do that.
> > > > > > >>>>>
> > > > > > >>>>> Best,
> > > > > > >>>>> Piotrek
> > > > > > >>>>>
> > > > > > >>>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com>
> > > > > > napisał(a):
> > > > > > >>>>>
> > > > > > >>>>>> Hi Piotr,
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks a lot for the explanation. Please see my reply
> > inline.
> > > > > > >>>>>>
> > > > > > >>>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > > > > > >>>>> piotr.nowoj...@gmail.com>
> > > > > > >>>>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hi Dong,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks a lot for the answers. I can now only briefly
> answer
> > > > your
> > > > > > >>> last
> > > > > > >>>>>>> email.
> > > > > > >>>>>>>
> > > > > > >>>>>>>> It is possible that spilling to disks might cause larger
> > > > > > >>> overhead.
> > > > > > >>>>> IMO
> > > > > > >>>>>> it
> > > > > > >>>>>>>> is an orthogonal issue already existing in Flink. This
> is
> > > > > > >>> because a
> > > > > > >>>>>> Flink
> > > > > > >>>>>>>> job running batch mode might also be slower than its
> > > > throughput
> > > > > > >>> in
> > > > > > >>>>>> stream
> > > > > > >>>>>>>> mode due to the same reason.
> > > > > > >>>>>>> Yes, I know, but the thing that worries me is that
> > previously
> > > > > only
> > > > > > >>> a
> > > > > > >>>>> user
> > > > > > >>>>>>> alone
> > > > > > >>>>>>> could decide whether to use batch mode or streaming, and
> in
> > > > > > >>> practice
> > > > > > >>>>> one
> > > > > > >>>>>>> user would rarely (if ever) use both for the same
> > > > > > >>> problem/job/query.
> > > > > > >>>> If
> > > > > > >>>>>> his
> > > > > > >>>>>>> intention was to eventually process live data, he was
> using
> > > > > > >>> streaming
> > > > > > >>>>>> even
> > > > > > >>>>>>> if there was a large backlog at the start (apart of some
> > very
> > > > few
> > > > > > >>>> very
> > > > > > >>>>>>> power
> > > > > > >>>>>>> users).
> > > > > > >>>>>>>
> > > > > > >>>>>>> With this change, we want to introduce a mode that would
> be
> > > > > > >>> switching
> > > > > > >>>>>> back
> > > > > > >>>>>>> and forth between streaming and "batch in streaming"
> > > > > automatically.
> > > > > > >>>> So
> > > > > > >>>>> a
> > > > > > >>>>>>> potential performance regression would be much more
> visible
> > > and
> > > > > > >>>> painful
> > > > > > >>>>>>> at the same time. If batch query runs slower then it
> could,
> > > > it's
> > > > > > >>> kind
> > > > > > >>>>> of
> > > > > > >>>>>>> fine as
> > > > > > >>>>>>> it will end at some point. If streaming query during
> large
> > > back
> > > > > > >>>>> pressure
> > > > > > >>>>>>> maybe
> > > > > > >>>>>>> temporary load spike switches to batch processing,
> that's a
> > > > > bigger
> > > > > > >>>>> deal.
> > > > > > >>>>>>> Especially if batch processing mode will not be able to
> > > > actually
> > > > > > >>> even
> > > > > > >>>>>>> handle
> > > > > > >>>>>>> the normal load, after the load spike. In that case, the
> > job
> > > > > could
> > > > > > >>>>> never
> > > > > > >>>>>>> recover
> > > > > > >>>>>>> from the backpressure/backlog mode.
> > > > > > >>>>>>>
> > > > > > >>>>>> I understand you are concerned with the risk of
> performance
> > > > > > >>> regression
> > > > > > >>>>>> introduced due to switching to batch mode.
> > > > > > >>>>>>
> > > > > > >>>>>> After thinking about this more, I think this existing
> > proposal
> > > > > meets
> > > > > > >>>> the
> > > > > > >>>>>> minimum requirement of "not introducing regression for
> > > existing
> > > > > > >>> jobs".
> > > > > > >>>>> The
> > > > > > >>>>>> reason is that even if batch mode can be slower than
> stream
> > > mode
> > > > > for
> > > > > > >>>> some
> > > > > > >>>>>> operators in some cases, this is an optional feature that
> > will
> > > > > only
> > > > > > >>> be
> > > > > > >>>>>> enabled if a user explicitly overrides the newly
> introduced
> > > > config
> > > > > > to
> > > > > > >>>>>> non-default values. Existing jobs that simply upgrade
> their
> > > > Flink
> > > > > > >>>> library
> > > > > > >>>>>> version will not suffer any performance regression.
> > > > > > >>>>>>
> > > > > > >>>>>> More specifically, in order to switch to batch mode, users
> > > will
> > > > > need
> > > > > > >>> to
> > > > > > >>>>>> explicitly set
> > execution.checkpointing.interval-during-backlog
> > > > to
> > > > > 0.
> > > > > > >>>> And
> > > > > > >>>>>> users can always explicitly update
> > > > > > >>>>>> execution.checkpointing.interval-during-backlog to turn
> off
> > > the
> > > > > > batch
> > > > > > >>>>> mode
> > > > > > >>>>>> if that incurs any performance issue.
> > > > > > >>>>>>
> > > > > > >>>>>> As far as I can tell, for all practical workloads we see
> in
> > > > > > >>> production
> > > > > > >>>>>> jobs, batch mode is always faster (w.r.t. throughput) than
> > > > stream
> > > > > > >>> mode
> > > > > > >>>>> when
> > > > > > >>>>>> there is a high backlog of incoming records. Though it is
> > > still
> > > > > > >>>>>> theoretically possible, it should be very rare (if any)
> for
> > > > batch
> > > > > > >>> mode
> > > > > > >>>> to
> > > > > > >>>>>> be slower in practice. Given that it is an optional
> feature
> > > that
> > > > > can
> > > > > > >>> be
> > > > > > >>>>>> turned off by users, it might be OK to just let users try
> it
> > > out
> > > > > and
> > > > > > >>> we
> > > > > > >>>>> can
> > > > > > >>>>>> fix performance issues once we detect any of them. What do
> > you
> > > > > > think?
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>>> execution.backlog.use-full-batch-mode-on-start (default
> > > false)
> > > > > > >>>>>>> ops sorry, it was supposed to be sth like:
> > > > > > >>>>>>>
> > > > > > >>>>>>> execution.backlog.use-batch-mode-only-on-start (default
> > > false)
> > > > > > >>>>>>>
> > > > > > >>>>>>> That option would disallow switching from streaming to
> > batch.
> > > > > Batch
> > > > > > >>>>> mode
> > > > > > >>>>>>> would be allowed only to get rid of the initial, present
> on
> > > > > > >>> start-up
> > > > > > >>>>>>> backlog.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Would allow us to safely experiment with switching from
> > > > streaming
> > > > > > >>> to
> > > > > > >>>>>> batch
> > > > > > >>>>>>> and I would be actually more fine in enabling "using
> batch
> > > mode
> > > > > on
> > > > > > >>>>> start"
> > > > > > >>>>>>> by default, until we gain confidence and feedback that
> > > > switching
> > > > > > >>>> back &
> > > > > > >>>>>>> forth
> > > > > > >>>>>>> is working as expected.
> > > > > > >>>>>>>
> > > > > > >>>>>> Now I understand what you are suggesting. I agree that it
> is
> > > > > > >>> necessary
> > > > > > >>>>> for
> > > > > > >>>>>> users to be able to disallow switching from streaming to
> > > batch.
> > > > > > >>>>>>
> > > > > > >>>>>> I am not sure it is necessary to introduce an extra config
> > > just
> > > > > for
> > > > > > >>>> this
> > > > > > >>>>>> purpose. The reason is that we don't have any strategy
> that
> > > > > switches
> > > > > > >>>>>> backlog status from false to true yet. And when we have
> such
> > > > > > strategy
> > > > > > >>>>> (e.g.
> > > > > > >>>>>> FLIP-328) in the future, it is very likely that we will
> > > > introduce
> > > > > > >>> extra
> > > > > > >>>>>> config(s) for users to explicitly turn on such a feature.
> > That
> > > > > means
> > > > > > >>>> user
> > > > > > >>>>>> should be able to turn off this feature even if we don't
> > have
> > > > > > >>> something
> > > > > > >>>>>> like execution.backlog.use-batch-mode-only-on-start.
> > > > > > >>>>>>
> > > > > > >>>>>> Maybe we can revisit the need for such a config when we
> > > > > > >>>> introduce/discuss
> > > > > > >>>>>> the capability to switch backlog from false to true in the
> > > > future.
> > > > > > >>> What
> > > > > > >>>>> do
> > > > > > >>>>>> you think?
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>>>> Or we could limit the scope of this FLIP to only
> support
> > > > > > >>> starting
> > > > > > >>>>> with
> > > > > > >>>>>>>>> batch mode and switching only once to
> > > > > > >>>>>>>>> streaming, and design a follow up with switching back
> and
> > > > > forth?
> > > > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this
> > FLIP
> > > > into
> > > > > > >>>> two
> > > > > > >>>>>>> FLIPs
> > > > > > >>>>>>>> so that we can make incremental progress.
> > > > > > >>>>>>> Great, let's do that. In a follow up FLIP we could
> restart
> > > the
> > > > > > >>>>> discussion
> > > > > > >>>>>>> about
> > > > > > >>>>>>> switching back and forth.
> > > > > > >>>>>>>
> > > > > > >>>>>> Cool, I added the following statement to the motivation
> > > section.
> > > > > > >>>>>>
> > > > > > >>>>>> "NOTE: this FLIP focuses only on the capability to switch
> > from
> > > > > batch
> > > > > > >>> to
> > > > > > >>>>>> stream mode. If there is any extra API needed to support
> > > > switching
> > > > > > >>> from
> > > > > > >>>>>> stream to batch mode, we will discuss them in a follow-up
> > > FLIP."
> > > > > > >>>>>>
> > > > > > >>>>>> I am looking forward to reading your follow-up thoughts!
> > > > > > >>>>>>
> > > > > > >>>>>> Best,
> > > > > > >>>>>> Dong
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>> Piotrek
> > > > > > >>>>>>>
> > > > > > >>>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com>
> > > > > > >>> napisał(a):
> > > > > > >>>>>>>> Hi Piotr,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thank you for the very detailed comments! Please see my
> > > reply
> > > > > > >>>> inline.
> > > > > > >>>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
> > > > > > >>>>>>> piotr.nowoj...@gmail.com>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> I have a couple of follow up questions about switching
> > back
> > > > and
> > > > > > >>>>> forth
> > > > > > >>>>>>>>> between streaming and batching mode.
> > > > > > >>>>>>>>> Especially around shuffle/watermark strategy, and keyed
> > > state
> > > > > > >>>>>> backend.
> > > > > > >>>>>>>>> First of all, it might not always be beneficial to
> switch
> > > > into
> > > > > > >>>> the
> > > > > > >>>>>>> batch
> > > > > > >>>>>>>>> modes:
> > > > > > >>>>>>>>> - Shuffle strategy
> > > > > > >>>>>>>>>     - Is sorting going to be purely in-memory? If not,
> > > > > > >>> obviously
> > > > > > >>>>>>> spilling
> > > > > > >>>>>>>>> to disks might cause larger overheads
> > > > > > >>>>>>>>>        compared to not sorting the records.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>> Sorting might require spilling data to disk depending on
> > the
> > > > > > >>> input
> > > > > > >>>>>> size.
> > > > > > >>>>>>>> The behavior of sorting w.r.t. memory/disk is expected
> to
> > be
> > > > > > >>>> exactly
> > > > > > >>>>>> the
> > > > > > >>>>>>>> same as the behavior of input sorting automatically
> > > performed
> > > > by
> > > > > > >>>>> Flink
> > > > > > >>>>>>>> runtime in batch mode for keyed inputs.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> More specifically, ExternalSorter
> > > > > > >>>>>>>> <
> > > > > > >>>>>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> > > > > > >>>>>>>> is
> > > > > > >>>>>>>> currently used to sort keyed inputs in batch mode. It is
> > > > > > >>>>> automatically
> > > > > > >>>>>>> used
> > > > > > >>>>>>>> by Flink runtime in OneInputStreamTask (here
> > > > > > >>>>>>>> <
> > > > > > >>>>>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> > > > > > >>>>>>>>> )
> > > > > > >>>>>>>> and in MultiInputSortingDataInput (here
> > > > > > >>>>>>>> <
> > > > > > >>>>>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> > > > > > >>>>>>>>> ).
> > > > > > >>>>>>>> We plan to re-use the same code/mechanism to do sorting.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> It is possible that spilling to disks might cause larger
> > > > > > >>> overhead.
> > > > > > >>>>> IMO
> > > > > > >>>>>> it
> > > > > > >>>>>>>> is an orthogonal issue already existing in Flink. This
> is
> > > > > > >>> because a
> > > > > > >>>>>> Flink
> > > > > > >>>>>>>> job running batch mode might also be slower than its
> > > > throughput
> > > > > > >>> in
> > > > > > >>>>>> stream
> > > > > > >>>>>>>> mode due to the same reason. However, even though it is
> > > > possible
> > > > > > >>> in
> > > > > > >>>>>>> theory,
> > > > > > >>>>>>>> I expect that in practice the throughput of using
> sorting
> > +
> > > > > > >>>>>>>> BatchExecutionKeyedStateBackend should be much higher
> than
> > > > using
> > > > > > >>>>> other
> > > > > > >>>>>>>> keyed statebackends when the amount of data is large.
> As a
> > > > > matter
> > > > > > >>>> of
> > > > > > >>>>>>> fact,
> > > > > > >>>>>>>> we have not heard of complaints of such performance
> > > regression
> > > > > > >>>> issues
> > > > > > >>>>>> in
> > > > > > >>>>>>>> batch mode.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> The primary goal of this FLIP is to allow the operator
> to
> > > run
> > > > at
> > > > > > >>>> the
> > > > > > >>>>>> same
> > > > > > >>>>>>>> throughput (in stream mode when there is backlog) as it
> > can
> > > > > > >>>> currently
> > > > > > >>>>>> do
> > > > > > >>>>>>> in
> > > > > > >>>>>>>> batch mode. And this goal is not affected by the disk
> > > overhead
> > > > > > >>>> issue
> > > > > > >>>>>>>> mentioned above.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I am thinking maybe we can treat it as an orthogonal
> > > > performance
> > > > > > >>>>>>>> optimization problem instead of solving this problem in
> > this
> > > > > > >>> FLIP?
> > > > > > >>>>>>>>     - If it will be at least partially in-memory, does
> > Flink
> > > > > have
> > > > > > >>>>> some
> > > > > > >>>>>>>>> mechanism to reserve optional memory that
> > > > > > >>>>>>>>>       can be revoked if a new operator starts up? Can
> > this
> > > > > > >>> memory
> > > > > > >>>>> be
> > > > > > >>>>>>>>> redistributed? Ideally we should use as
> > > > > > >>>>>>>>>       much as possible of the available memory to avoid
> > > > > > >>> spilling
> > > > > > >>>>>> costs,
> > > > > > >>>>>>>> but
> > > > > > >>>>>>>>> also being able to revoke that memory
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>> This FLIP does not support dynamically
> > > revoking/redistribuitng
> > > > > > >>>>> managed
> > > > > > >>>>>>>> memory used by the ExternalSorter.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> For operators with isInternalSorterSupported = true, we
> > will
> > > > > > >>>> allocate
> > > > > > >>>>>> to
> > > > > > >>>>>>>> this operator execution.sorted-inputs.memory
> > > > > > >>>>>>>> <
> > > > > > >>>>>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
> > > > > > >>>>>>>> amount of managed memory. This is the same as how Flink
> > > > > allocates
> > > > > > >>>>>> managed
> > > > > > >>>>>>>> memory to an operator when this operator has keyed
> inputs
> > in
> > > > > > >>> batch
> > > > > > >>>>>> mode.
> > > > > > >>>>>>>> Note that this FLIP intends to support operators to sort
> > > > inputs
> > > > > > >>>>>> whenever
> > > > > > >>>>>>>> there is backlog. And there is currently no way for an
> > > > operator
> > > > > > >>> to
> > > > > > >>>>> know
> > > > > > >>>>>>> in
> > > > > > >>>>>>>> advance whether there will be no backlog after a given
> > time.
> > > > So
> > > > > > >>> it
> > > > > > >>>>>> seems
> > > > > > >>>>>>>> simpler to just keep managed memory for such an operator
> > > > > > >>> throughout
> > > > > > >>>>> the
> > > > > > >>>>>>>> lifecycle of this operator, for now.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Besides, it seems that the lack of ability to
> dynamically
> > > > > > >>>>>>>> revoke/redistribute un-used managed memory is an
> existing
> > > > issue
> > > > > > >>> in
> > > > > > >>>>>> Flink.
> > > > > > >>>>>>>> For example, we might have two operators sharing the
> same
> > > slot
> > > > > > >>> and
> > > > > > >>>>>> these
> > > > > > >>>>>>>> two operators both use managed memory (e.g. to sort
> > inputs).
> > > > > > >>> There
> > > > > > >>>> is
> > > > > > >>>>>>>> currently no way for one operator to re-use the memory
> not
> > > > used
> > > > > > >>> by
> > > > > > >>>>> the
> > > > > > >>>>>>>> other operator.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Therefore, I think we can treat this as an orthogonal
> > > > > performance
> > > > > > >>>>>>>> optimization problem which can be addressed separately.
> > What
> > > > do
> > > > > > >>> you
> > > > > > >>>>>>> think?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>>     - Sometimes sorting, even if we have memory to do
> > that,
> > > > > > >>> might
> > > > > > >>>>> be
> > > > > > >>>>>> an
> > > > > > >>>>>>>>> unnecessary overhead.
> > > > > > >>>>>>>>> - Watermarks
> > > > > > >>>>>>>>>     - Is holding back watermarks always good? If we
> have
> > > tons
> > > > > > >>> of
> > > > > > >>>>> data
> > > > > > >>>>>>>>> buffered/sorted and waiting to be processed
> > > > > > >>>>>>>>>        with multiple windows per key and many different
> > > keys.
> > > > > > >>>> When
> > > > > > >>>>> we
> > > > > > >>>>>>>>> switch back to `isBacklog=false` we
> > > > > > >>>>>>>>>        first process all of that data before processing
> > > > > > >>>> watermarks,
> > > > > > >>>>>> for
> > > > > > >>>>>>>>> operators that are not using sorted input the
> > > > > > >>>>>>>>>        state size can explode significantly causing
> lots
> > of
> > > > > > >>>>> problems.
> > > > > > >>>>>>>> Even
> > > > > > >>>>>>>>> for those that can use sorting, switching to
> > > > > > >>>>>>>>>        sorting or BatchExecutionKeyedStateBackend is
> not
> > > > > > >>> always a
> > > > > > >>>>>> good
> > > > > > >>>>>>>>> idea, but keeping RocksDB also can be
> > > > > > >>>>>>>>>        risky.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>> With the current FLIP, the proposal is to use a sorter
> > only
> > > > when
> > > > > > >>>> the
> > > > > > >>>>>>> inputs
> > > > > > >>>>>>>> have keys. According to this practice, operators which
> are
> > > not
> > > > > > >>>> using
> > > > > > >>>>>>>> sorting should have un-keyed inputs. I believe such an
> > > > operator
> > > > > > >>>> will
> > > > > > >>>>>> not
> > > > > > >>>>>>>> even use a keyed state backend. Maybe I missed some
> > > use-case.
> > > > > Can
> > > > > > >>>> you
> > > > > > >>>>>>>> provide a use-case where we will have an operator with
> > > > un-keyed
> > > > > > >>>>> inputs
> > > > > > >>>>>>>> whose state size can explode due to we holding back
> > > > watermarks?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> For operators with keyed inputs that use sorting, I
> > suppose
> > > it
> > > > > is
> > > > > > >>>>>>> possible
> > > > > > >>>>>>>> that sorting + BatchExecutionKeyedStateBackend can be
> > worse
> > > > than
> > > > > > >>>>> using
> > > > > > >>>>>>>> RocksDB. But I believe this is very very rare (if
> > possible)
> > > in
> > > > > > >>>> almost
> > > > > > >>>>>>>> practical usage of Flink.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Take one step back, if this indeed cause regression for
> a
> > > real
> > > > > > >>>>>> use-case,
> > > > > > >>>>>>>> user can set
> > execution.checkpointing.interval-during-backlog
> > > > to
> > > > > > >>>>>> anything
> > > > > > >>>>>>>> other than 0 so that this FLIP will not use
> > > > > > >>>>>>>> sorter + BatchExecutionKeyedStateBackend even even when
> > > there
> > > > is
> > > > > > >>>>>> backlog.
> > > > > > >>>>>>>> I would hope we can find a way to automatically
> determine
> > > > > whether
> > > > > > >>>>> using
> > > > > > >>>>>>>> sorting + BatchExecutionKeyedStateBackend can be better
> or
> > > > worse
> > > > > > >>>> than
> > > > > > >>>>>>> using
> > > > > > >>>>>>>> RocksDB alone. But I could not find a good and reliable
> > way
> > > to
> > > > > do
> > > > > > >>>>> this.
> > > > > > >>>>>>>> Maybe we can update Flink to do this when we find a good
> > way
> > > > to
> > > > > > >>> do
> > > > > > >>>>> this
> > > > > > >>>>>>> in
> > > > > > >>>>>>>> the future?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> - Keyed state backend
> > > > > > >>>>>>>>>     - I think you haven't described what happens during
> > > > > > >>> switching
> > > > > > >>>>>> from
> > > > > > >>>>>>>>> streaming to backlog processing.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>> Good point. This indeed needs to be described. I added a
> > > TODO
> > > > in
> > > > > > >>>> the
> > > > > > >>>>>>>> "Behavior changes ..." section to describe what happens
> > when
> > > > > > >>>>> isBacklog
> > > > > > >>>>>>>> switches from false to true, for all
> > > > > > >>>>> watermark/checkpoint/statebackend
> > > > > > >>>>>>> etc.
> > > > > > >>>>>>>> Let me explain this for the state backend here for now.
> I
> > > will
> > > > > > >>>> update
> > > > > > >>>>>>> FLIP
> > > > > > >>>>>>>> later.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> When isBacklog switches from false to true, operator
> with
> > > > keyed
> > > > > > >>>>> inputs
> > > > > > >>>>>>> can
> > > > > > >>>>>>>> optionally (as determined by its implementation) starts
> to
> > > use
> > > > > > >>>>> internal
> > > > > > >>>>>>>> sorter to sort inputs by key, without processing inputs
> or
> > > > > > >>> updating
> > > > > > >>>>>>>> statebackend, until it receives end-of-inputs or
> isBacklog
> > > is
> > > > > > >>>>> switched
> > > > > > >>>>>> to
> > > > > > >>>>>>>> false again.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>>     - Switch can be an unnecessary overhead.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I agree it can cause unnecessary overhead, particularly
> > when
> > > > > > >>>>> isBacklog
> > > > > > >>>>>>>> switches back and forth frequently. Whether or not this
> is
> > > > > > >>>>> unnecessary
> > > > > > >>>>>>>> likely depends on the duration/throughput of the backlog
> > > phase
> > > > > as
> > > > > > >>>>> well
> > > > > > >>>>>> as
> > > > > > >>>>>>>> the specific computation logic of the operator. I am not
> > > sure
> > > > > > >>> there
> > > > > > >>>>> is
> > > > > > >>>>>> a
> > > > > > >>>>>>>> good way for Flink to determine in advance whether
> > switching
> > > > is
> > > > > > >>>>>>>> unnecessary.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Note that for the existing use-case where we expect to
> > > change
> > > > > > >>>>> isBacklog
> > > > > > >>>>>>> to
> > > > > > >>>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source
> > watermark
> > > > lag
> > > > > > >>>> being
> > > > > > >>>>>> too
> > > > > > >>>>>>>> high), we don't expect the watermark to switch back and
> > > force
> > > > > > >>>>>> frequently.
> > > > > > >>>>>>>> And user can disable this switch by setting
> > > > > > >>>>>>>> execution.checkpointing.interval-during-backlog to
> > anything
> > > > > other
> > > > > > >>>>> than
> > > > > > >>>>>> 0.
> > > > > > >>>>>>>> Therefore, I am wondering if we can also view this as a
> > > > > > >>> performance
> > > > > > >>>>>>>> optimization opportunity for extra use-cases in the
> > future,
> > > > > > >>> rather
> > > > > > >>>>>> than a
> > > > > > >>>>>>>> blocking issue of this FLIP for the MVP use-case (e.g.
> > > > snapshot
> > > > > > >>>> phase
> > > > > > >>>>>> for
> > > > > > >>>>>>>> any CDC source, Kafka watermark lag).
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> At the same time, in your current proposal, for
> > > > > > >>>>>>>>> `execution.checkpointing.interval-during-backlog > 0`
> we
> > > > won't
> > > > > > >>>>>>>>> switch to "batch" mode at all. That's a bit of shame, I
> > > don't
> > > > > > >>>>>>> understand
> > > > > > >>>>>>>>> why those two things should be coupled
> > > > > > >>>>>>>>> together?
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>> We can in general classify optimizations as those that
> are
> > > > > > >>>> compatible
> > > > > > >>>>>>> with
> > > > > > >>>>>>>> checkpointing, and those that are not compatible with
> > > > > > >>>> checkpointing.
> > > > > > >>>>>> For
> > > > > > >>>>>>>> example, input sorting is currently not compatible with
> > > > > > >>>>> checkpointing.
> > > > > > >>>>>>> And
> > > > > > >>>>>>>> buffering input records to reduce state backend overhead
> > > (and
> > > > > > >>>>> probably
> > > > > > >>>>>>>> columnar processing for mini-batch in the future) is
> > > > compatible
> > > > > > >>>> with
> > > > > > >>>>>>>> checkpointing.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> The primary of FLIP-327 is to support optimizations not
> > > > > > >>> compatible
> > > > > > >>>>> with
> > > > > > >>>>>>>> checkpointing. If
> > > > > > >>> execution.checkpointing.interval-during-backlog >
> > > > > > >>>>> 0,
> > > > > > >>>>>>>> which means that user intends to still do checkpointing
> > even
> > > > > when
> > > > > > >>>>> there
> > > > > > >>>>>>> is
> > > > > > >>>>>>>> backog, then we will not be able to support such
> > > > optimizations.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> For optimizations that are compatible with
> checkpointing,
> > we
> > > > can
> > > > > > >>> do
> > > > > > >>>>>> this
> > > > > > >>>>>>>> even when the operator does not run in "batch mode".
> There
> > > are
> > > > > > >>>> extra
> > > > > > >>>>>>>> problems to solve in order to achieve this optimization,
> > > such
> > > > as
> > > > > > >>>>>>> supporting
> > > > > > >>>>>>>> unaligned checkpointing without prolonging its sync
> > phase. I
> > > > > plan
> > > > > > >>>> to
> > > > > > >>>>>>>> explain how this can be done in FLIP-325.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> All in all, shouldn't we aim for some more clever
> process
> > > of
> > > > > > >>>>>> switching
> > > > > > >>>>>>>> back
> > > > > > >>>>>>>>> and forth between streaming/batch modes
> > > > > > >>>>>>>>> for watermark strategy/state backend/sorting based on
> > some
> > > > > > >>>> metrics?
> > > > > > >>>>>>>> Trying
> > > > > > >>>>>>>>> to either predict if switching might help,
> > > > > > >>>>>>>>> or trying to estimate if the last switch was
> beneficial?
> > > > Maybe
> > > > > > >>>>>>> something
> > > > > > >>>>>>>>> along the lines:
> > > > > > >>>>>>>>> - sort only in memory and during sorting count the
> number
> > > of
> > > > > > >>>>> distinct
> > > > > > >>>>>>>> keys
> > > > > > >>>>>>>>> (NDK)
> > > > > > >>>>>>>>>     - maybe allow for spilling if so far in memory we
> > have
> > > > NDK
> > > > > > >>> *
> > > > > > >>>> 5
> > > > > > >>>>>> =
> > > > > > >>>>>>>>> #records
> > > > > > >>>>>>>>> - do not allow to buffer records above a certain
> > threshold,
> > > > as
> > > > > > >>>>>>> otherwise
> > > > > > >>>>>>>>> checkpointing can explode
> > > > > > >>>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if
> NDK
> > > * 2
> > > > > > >>>> =
> > > > > > >>>>>>> #records
> > > > > > >>>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <=
> > #records
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Or even maybe for starters something even simpler and
> > then
> > > > test
> > > > > > >>>> out
> > > > > > >>>>>>>>> something more fancy as a follow up?
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>> I agree it is worth investigating these ideas to further
> > > > > optimize
> > > > > > >>>> the
> > > > > > >>>>>>>> performance during backlog.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I just think these can be done independently after this
> > > FLIP.
> > > > > The
> > > > > > >>>>> focus
> > > > > > >>>>>>> of
> > > > > > >>>>>>>> this FLIP is to re-use in stream mode the same
> > optimization
> > > > > which
> > > > > > >>>> we
> > > > > > >>>>>>>> already use in batch mode, rather than inventing or
> > > improving
> > > > > the
> > > > > > >>>>>>>> performance of these existing optimizations.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Given that there are already a lot of new
> > mechanism/features
> > > > to
> > > > > > >>>>> discuss
> > > > > > >>>>>>> and
> > > > > > >>>>>>>> address in this FLIP, I am hoping we can limit the scope
> > of
> > > > this
> > > > > > >>>> FLIP
> > > > > > >>>>>> to
> > > > > > >>>>>>>> re-use the existing optimization, and do these extra
> > > > > optimization
> > > > > > >>>>>>>> opportunities as future work.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> What do you think?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> At the same time,
> > > > > > >>>>> `execution.checkpointing.interval-during-backlog=0`
> > > > > > >>>>>>>> seems
> > > > > > >>>>>>>>> a weird setting to me, that I would
> > > > > > >>>>>>>>> not feel safe recommending to anyone. If processing of
> a
> > > > > > >>> backlog
> > > > > > >>>>>> takes
> > > > > > >>>>>>> a
> > > > > > >>>>>>>>> long time, a job might stop making
> > > > > > >>>>>>>>> any progress due to some random failures. Especially
> > > > dangerous
> > > > > > >>>> if a
> > > > > > >>>>>> job
> > > > > > >>>>>>>> switches from streaming mode back to
> > > > > > >>>>>>>>> backlog processing due to some reasons, as that could
> > > happen
> > > > > > >>>> months
> > > > > > >>>>>>> after
> > > > > > >>>>>>>>> someone started a job with this
> > > > > > >>>>>>>>> strange setting. So should we even have it? I would
> > simply
> > > > > > >>>> disallow
> > > > > > >>>>>>> it. I
> > > > > > >>>>>>>> Good point. I do agree we need to further work to
> improve
> > > the
> > > > > > >>>>> failover
> > > > > > >>>>>>>> performance in case any task fails.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> As of the current FLIP, if any task fails during backlog
> > and
> > > > > > >>>>>>>> execution.checkpointing.interval-during-backlog = 0, we
> > will
> > > > > need
> > > > > > >>>> to
> > > > > > >>>>>>>> restart all operators to the last checkpointed state and
> > > > > continue
> > > > > > >>>>>>>> processing backlog. And this can be a lot of rollback
> > since
> > > > > there
> > > > > > >>>> is
> > > > > > >>>>> no
> > > > > > >>>>>>>> checkpoint during backlog. And this can also be worse
> than
> > > > batch
> > > > > > >>>>> since
> > > > > > >>>>>>> this
> > > > > > >>>>>>>> FLIP currently does not support exporting/saving records
> > to
> > > > > local
> > > > > > >>>>> disk
> > > > > > >>>>>>> (or
> > > > > > >>>>>>>> shuffle service) so that a failed task can re-consume
> the
> > > > > records
> > > > > > >>>>> from
> > > > > > >>>>>>> the
> > > > > > >>>>>>>> upstream task (or shuffle service) in the same way as
> how
> > > > Flink
> > > > > > >>>>>> failover
> > > > > > >>>>>>> a
> > > > > > >>>>>>>> task in batch mode.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I think we can extend this FLIP to solve this problem so
> > > that
> > > > it
> > > > > > >>>> can
> > > > > > >>>>>> have
> > > > > > >>>>>>>> at least the same behavior/performance as batch-mode
> job.
> > > The
> > > > > > >>> idea
> > > > > > >>>> is
> > > > > > >>>>>> to
> > > > > > >>>>>>>> also follow what batch mode does. For example, we can
> > > trigger
> > > > a
> > > > > > >>>>>>> checkpoint
> > > > > > >>>>>>>> when isBacklog switches to true, and every operator
> should
> > > > > buffer
> > > > > > >>>> its
> > > > > > >>>>>>>> output in the TM local disk (or remote shuffle service).
> > > > > > >>> Therefore,
> > > > > > >>>>>>> after a
> > > > > > >>>>>>>> task fails, it can restart from the last checkpoint and
> > > > > > >>> re-consume
> > > > > > >>>>> data
> > > > > > >>>>>>>> buffered in the upstream task.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I will update FLIP as described above. Would this
> address
> > > your
> > > > > > >>>>> concern?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> could see a power setting like:
> > > > > > >>>>>>>>>         `execution.backlog.use-full-batch-mode-on-start
> > > > > > >>> (default
> > > > > > >>>>>>> false)`
> > > > > > >>>>>>>> I am not sure I fully understand this config or its
> > > > motivation.
> > > > > > >>> Can
> > > > > > >>>>> you
> > > > > > >>>>>>>> help explain the exact semantics of this config?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> that would override any heuristic of switching to
> backlog
> > > if
> > > > > > >>>>> someone
> > > > > > >>>>>> is
> > > > > > >>>>>>>>> submitting a new job that starts with
> > > > > > >>>>>>>>> `isBacklog=true`.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Or we could limit the scope of this FLIP to only
> support
> > > > > > >>> starting
> > > > > > >>>>>> with
> > > > > > >>>>>>>>> batch mode and switching only once to
> > > > > > >>>>>>>>> streaming, and design a follow up with switching back
> and
> > > > > > >>> forth?
> > > > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this
> > FLIP
> > > > into
> > > > > > >>>> two
> > > > > > >>>>>>> FLIPs
> > > > > > >>>>>>>> so that we can make incremental progress.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Best,
> > > > > > >>>>>>>> Dong
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> I'm looking forwards to hearing/reading out your
> > thoughts.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Best,
> > > > > > >>>>>>>>> Piotrek
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge
> > > <j...@ververica.com.invalid
> > > > >
> > > > > > >>>>>>>> napisał(a):
> > > > > > >>>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Thanks for your reply!
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Best regards,
> > > > > > >>>>>>>>>> Jing
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <
> > > > > > >>> lindon...@gmail.com>
> > > > > > >>>>>>> wrote:
> > > > > > >>>>>>>>>>> Hi Jing,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thanks for the comments. Please see my reply inline.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
> > > > > > >>>>>> <j...@ververica.com.invalid
> > > > > > >>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thanks for the clarification. Now it is clear for
> me.
> > I
> > > > > > >>> got
> > > > > > >>>>>>>>> additional
> > > > > > >>>>>>>>>>> noob
> > > > > > >>>>>>>>>>>> questions wrt the internal sorter.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 1. when to call setter to set the
> > > internalSorterSupported
> > > > > > >>>> to
> > > > > > >>>>> be
> > > > > > >>>>>>>> true?
> > > > > > >>>>>>>>>>> Developer of the operator class (i.e. those classes
> > which
> > > > > > >>>>>>> implements
> > > > > > >>>>>>>>>>> `StreamOperator`) should override the
> > > > > > >>>>> `#getOperatorAttributes()`
> > > > > > >>>>>>> API
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>> set
> > > > > > >>>>>>>>>>> internalSorterSupported to true, if he/she decides to
> > > sort
> > > > > > >>>>>> records
> > > > > > >>>>>>>>>>> internally in the operator.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 2
> > > > > > >>>>>>>>>>>> *"For those operators whose throughput can be
> > > > > > >>> considerably
> > > > > > >>>>>>> improved
> > > > > > >>>>>>>>>> with
> > > > > > >>>>>>>>>>> an
> > > > > > >>>>>>>>>>>> internal sorter, update it to take advantage of the
> > > > > > >>>> internal
> > > > > > >>>>>>> sorter
> > > > > > >>>>>>>>>> when
> > > > > > >>>>>>>>>>>> its input has isBacklog=true.*
> > > > > > >>>>>>>>>>>> *Typically, operators that involve aggregation
> > operation
> > > > > > >>>>> (e.g.
> > > > > > >>>>>>>> join,
> > > > > > >>>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from
> > > > > > >>> using
> > > > > > >>>> an
> > > > > > >>>>>>>>> internal
> > > > > > >>>>>>>>>>>> sorter."*
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> *"The operator that performs CoGroup operation will
> > > > > > >>>>> instantiate
> > > > > > >>>>>>> two
> > > > > > >>>>>>>>>>>> internal sorter to sorts records from its two inputs
> > > > > > >>>>>> separately.
> > > > > > >>>>>>>> Then
> > > > > > >>>>>>>>>> it
> > > > > > >>>>>>>>>>>> can pull the sorted records from these two sorters.
> > This
> > > > > > >>>> can
> > > > > > >>>>> be
> > > > > > >>>>>>>> done
> > > > > > >>>>>>>>>>>> without wrapping input records with
> TaggedUnion<...>.
> > In
> > > > > > >>>>>>>> comparison,
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>> existing DataStream#coGroup needs to wrap input
> > records
> > > > > > >>>> with
> > > > > > >>>>>>>>>>>> TaggedUnion<...> before sorting them using one
> > external
> > > > > > >>>>> sorter,
> > > > > > >>>>>>>> which
> > > > > > >>>>>>>>>>>> introduces higher overhead."*
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> According to the performance test, it seems that
> > > internal
> > > > > > >>>>>> sorter
> > > > > > >>>>>>>> has
> > > > > > >>>>>>>>>>> better
> > > > > > >>>>>>>>>>>> performance than external sorter. Is it possible to
> > make
> > > > > > >>>>> those
> > > > > > >>>>>>>>>> operators
> > > > > > >>>>>>>>>>>> that can benefit from it use internal sorter by
> > default?
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>> Yes, it is possible. After this FLIP is done, users
> can
> > > use
> > > > > > >>>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the
> > window
> > > > > > >>>>> assigner
> > > > > > >>>>>>> to
> > > > > > >>>>>>>>>>> co-group two streams in effectively the batch manner.
> > An
> > > > > > >>>>> operator
> > > > > > >>>>>>>> that
> > > > > > >>>>>>>>>> uses
> > > > > > >>>>>>>>>>> an internal sorter will be used to perform the
> co-group
> > > > > > >>>>>> operation.
> > > > > > >>>>>>>>> There
> > > > > > >>>>>>>>>> is
> > > > > > >>>>>>>>>>> no need for users of the DataStream API to explicitly
> > > know
> > > > > > >>> or
> > > > > > >>>>> set
> > > > > > >>>>>>> the
> > > > > > >>>>>>>>>>> internal sorter in anyway.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> In the future, we plan to incrementally optimize
> other
> > > > > > >>>>>> aggregation
> > > > > > >>>>>>>>>>> operation (e.g. aggregate) on the DataStream API when
> > > > > > >>>>>>>>> EndOfStreamWindows
> > > > > > >>>>>>>>>> is
> > > > > > >>>>>>>>>>> used as the window assigner.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>> Dong
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best regards,
> > > > > > >>>>>>>>>>>> Jing
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
> > > > > > >>>>> lindon...@gmail.com>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>> Hi Jing,
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thank you for the comments! Please see my reply
> > inline.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
> > > > > > >>>>>>>> <j...@ververica.com.invalid
> > > > > > >>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in
> good
> > > > > > >>>>>> shape. I
> > > > > > >>>>>>>> got
> > > > > > >>>>>>>>>>> some
> > > > > > >>>>>>>>>>>>> NIT
> > > > > > >>>>>>>>>>>>>> questions.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 1. It is a little bit weird to write the hint
> right
> > > > > > >>>> after
> > > > > > >>>>>> the
> > > > > > >>>>>>>>>>>> motivation
> > > > > > >>>>>>>>>>>>>> that some features have been moved to FLIP-331,
> > > > > > >>> because
> > > > > > >>>>> at
> > > > > > >>>>>>> that
> > > > > > >>>>>>>>>> time,
> > > > > > >>>>>>>>>>>>>> readers don't know the context about what features
> > > > > > >>> does
> > > > > > >>>>> it
> > > > > > >>>>>>>> mean.
> > > > > > >>>>>>>>> I
> > > > > > >>>>>>>>>>>> would
> > > > > > >>>>>>>>>>>>>> suggest moving the note to the beginning of
> "Public
> > > > > > >>>>>>> interfaces"
> > > > > > >>>>>>>>>>>> sections.
> > > > > > >>>>>>>>>>>>> Given that the reviewer who commented on this email
> > > > > > >>>> thread
> > > > > > >>>>>>>> before I
> > > > > > >>>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331,
> I
> > > > > > >>>> think
> > > > > > >>>>> it
> > > > > > >>>>>>> is
> > > > > > >>>>>>>>>>> simpler
> > > > > > >>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>> just remove any mention of FLIP-331. I have updated
> > the
> > > > > > >>>>> FLIP
> > > > > > >>>>>>>>>>> accordingly.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 2. It is also a little bit weird to describe all
> > > > > > >>>>> behaviour
> > > > > > >>>>>>>>> changes
> > > > > > >>>>>>>>>> at
> > > > > > >>>>>>>>>>>>> first
> > > > > > >>>>>>>>>>>>>> but only focus on one single feature, i.e. how to
> > > > > > >>>>> implement
> > > > > > >>>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I
> was
> > > > > > >>>>>> reading
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>> Public
> > > > > > >>>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another
> > > > > > >>> option
> > > > > > >>>>>> could
> > > > > > >>>>>>>> be
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>>>>> write a
> > > > > > >>>>>>>>>>>>>> short summary of all features and point out that
> > this
> > > > > > >>>>> FLIP
> > > > > > >>>>>>> will
> > > > > > >>>>>>>>>> only
> > > > > > >>>>>>>>>>>>> focus
> > > > > > >>>>>>>>>>>>>> on the internalSorterSupported feature. Others
> could
> > > > > > >>> be
> > > > > > >>>>>> found
> > > > > > >>>>>>>> in
> > > > > > >>>>>>>>>>>>> FLIP-331.
> > > > > > >>>>>>>>>>>>>> WDYT?
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow
> a
> > > > > > >>>> stream
> > > > > > >>>>>>> mode
> > > > > > >>>>>>>>> job
> > > > > > >>>>>>>>>> to
> > > > > > >>>>>>>>>>>> run
> > > > > > >>>>>>>>>>>>> parts of the topology in batch mode so that it can
> > > > > > >>> apply
> > > > > > >>>>>>>>>>>>> optimizations/computations that can not be used
> > > > > > >>> together
> > > > > > >>>>> with
> > > > > > >>>>>>>>>>>> checkpointing
> > > > > > >>>>>>>>>>>>> (and thus not usable in stream mode). Although
> > internal
> > > > > > >>>>>> sorter
> > > > > > >>>>>>> is
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>>> only
> > > > > > >>>>>>>>>>>>> optimization immediately supported in this FLIP,
> this
> > > > > > >>>> FLIP
> > > > > > >>>>>> lays
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>>>> foundation to support other optimizations in the
> > > > > > >>> future,
> > > > > > >>>>> such
> > > > > > >>>>>>> as
> > > > > > >>>>>>>>>> using
> > > > > > >>>>>>>>>>>> GPU
> > > > > > >>>>>>>>>>>>> to process a bounded stream of records.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Therefore, I find it better to keep the current
> title
> > > > > > >>>>> rather
> > > > > > >>>>>>> than
> > > > > > >>>>>>>>>>>> limiting
> > > > > > >>>>>>>>>>>>> the scope to internal sorter. What do you think?
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and
> > > > > > >>> failover
> > > > > > >>>>>>>> strategy
> > > > > > >>>>>>>>> ->
> > > > > > >>>>>>>>>>>> Mixed
> > > > > > >>>>>>>>>>>>>> mode ->
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>    - If any task fails when isBacklog=false true,
> > > > > > >>> this
> > > > > > >>>>> task
> > > > > > >>>>>>> is
> > > > > > >>>>>>>>>>>> restarted
> > > > > > >>>>>>>>>>>>> to
> > > > > > >>>>>>>>>>>>>>    re-process its input from the beginning.
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Thank you for catching this issue. It is fixed now.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>> Dong
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Best regards
> > > > > > >>>>>>>>>>>>>> Jing
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
> > > > > > >>>>>> lindon...@gmail.com
> > > > > > >>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>> Hi Piotr,
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> Thanks for your comments! Please see my reply
> > > > > > >>> inline.
> > > > > > >>>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > > > > >>>>>>>>>>>>> piotr.nowoj...@gmail.com
> > > > > > >>>>>>>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Hi Dong,
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> I have a couple of questions.
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>> Could you explain why those properties
> > > > > > >>>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>>>     @Nullable private Boolean isOutputOnEOF =
> > > > > > >>> null;
> > > > > > >>>>>>>>>>>>>>>>     @Nullable private Boolean
> > > > > > >>> isOutputOnCheckpoint
> > > > > > >>>> =
> > > > > > >>>>>>> null;
> > > > > > >>>>>>>>>>>>>>>>     @Nullable private Boolean
> > > > > > >>>>>> isInternalSorterSupported =
> > > > > > >>>>>>>>> null;
> > > > > > >>>>>>>>>>>>>>>> must be `@Nullable`, instead of having the
> > > > > > >>> default
> > > > > > >>>>>> value
> > > > > > >>>>>>>> set
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>>>>>> `false`?
> > > > > > >>>>>>>>>>>>>>> By initializing these private variables in
> > > > > > >>>>>>>>>>> OperatorAttributesBuilder
> > > > > > >>>>>>>>>>>> as
> > > > > > >>>>>>>>>>>>>>> null, we can implement
> > > > > > >>>>>> `OperatorAttributesBuilder#build()`
> > > > > > >>>>>>> in
> > > > > > >>>>>>>>>> such
> > > > > > >>>>>>>>>>> a
> > > > > > >>>>>>>>>>>>> way
> > > > > > >>>>>>>>>>>>>>> that it can print DEBUG level logging to say
> > > > > > >>>>>>>>>> "isOutputOnCheckpoint
> > > > > > >>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>> not
> > > > > > >>>>>>>>>>>>>>> explicitly set". This can help user/SRE debug
> > > > > > >>>>> performance
> > > > > > >>>>>>>>> issues
> > > > > > >>>>>>>>>>> (or
> > > > > > >>>>>>>>>>>>> lack
> > > > > > >>>>>>>>>>>>>>> of the expected optimization) due to operators
> not
> > > > > > >>>>>>> explicitly
> > > > > > >>>>>>>>>>> setting
> > > > > > >>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> right operator attribute.
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> For example, we might want a job to always use
> the
> > > > > > >>>>> longer
> > > > > > >>>>>>>>>>>> checkpointing
> > > > > > >>>>>>>>>>>>>>> interval (i.e.
> > > > > > >>>>>>>> execution.checkpointing.interval-during-backlog)
> > > > > > >>>>>>>>>> if
> > > > > > >>>>>>>>>>>> all
> > > > > > >>>>>>>>>>>>>>> running operators have
> isOutputOnCheckpoint==false,
> > > > > > >>>> and
> > > > > > >>>>>> use
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>>>> short
> > > > > > >>>>>>>>>>>>>>> checkpointing interval otherwise. If a user has
> > > > > > >>>>>> explicitly
> > > > > > >>>>>>>>>>> configured
> > > > > > >>>>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog
> but
> > > > > > >>>> the
> > > > > > >>>>>>>>> two-phase
> > > > > > >>>>>>>>>>>>> commit
> > > > > > >>>>>>>>>>>>>>> sink library has not been upgraded to set
> > > > > > >>>>>>>>>>> isOutputOnCheckpoint=true,
> > > > > > >>>>>>>>>>>>> then
> > > > > > >>>>>>>>>>>>>>> the job will end up using the long checkpointing
> > > > > > >>>>>> interval,
> > > > > > >>>>>>>> and
> > > > > > >>>>>>>>> it
> > > > > > >>>>>>>>>>>> will
> > > > > > >>>>>>>>>>>>> be
> > > > > > >>>>>>>>>>>>>>> useful to figure out what is going wrong in this
> > > > > > >>> case
> > > > > > >>>>> by
> > > > > > >>>>>>>>> checking
> > > > > > >>>>>>>>>>> the
> > > > > > >>>>>>>>>>>>>> log.
> > > > > > >>>>>>>>>>>>>>> Note that the default value of these fields of
> the
> > > > > > >>>>>>>>>>> OperatorAttributes
> > > > > > >>>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will
> > > > > > >>>> still
> > > > > > >>>>> be
> > > > > > >>>>>>>>> false.
> > > > > > >>>>>>>>>>> The
> > > > > > >>>>>>>>>>>>>>> following is mentioned in the Java doc of
> > > > > > >>>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`:
> > > > > > >>>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>>> /**
> > > > > > >>>>>>>>>>>>>>>   * If any operator attribute is null, we will
> log
> > > > > > >>> it
> > > > > > >>>>> at
> > > > > > >>>>>>>> DEBUG
> > > > > > >>>>>>>>>>> level
> > > > > > >>>>>>
> > >
> >
>

Reply via email to