Hi Xintong,

Thank you for all the comments. Please see my reply inline.


On Mon, Sep 18, 2023 at 11:31 AM Xintong Song <tonysong...@gmail.com> wrote:

> Thanks for addressing my comments, Dong.
>
> The expected behavior of checkpointing and failover depends on whether
> > there is any operator currently running in the job with all its inputs'
> > isBacklog=true. If there exists such an operator and
> > interval-during-backlog = 0, then checkpoint will be disabled and the
> > operator will have to failover in a way similar to batch mode.
>
>
> This makes sense to me. Shall we also put this into the FLIP. Or maybe you
> already did that and I overlooked it? The current description in "4)
> Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to me.
> It says "At the point when isBacklog switches to false, source operator
> ...", which sounds like upon any source operator switching to isBacklog =
> false.
>


I think it is kind of mentioned in the doc
of execution.checkpointing.interval-during-backlog, which says "if it is
not null and any source reports isProcessingBacklog=true, it is the
interval...".

Based on this doc, we can derive that if there is one operator reporting
isBacklog=true, then the checkpointing interval is determined by
interval-during-backlog, which in this case has value 0 indicating that
checkpoint triggering is disabled.

Given that other readers might also have this question, I have updated the
FLIP-327 with the following statement to make it more explicit: "For jobs
with multiple sources and execution.checkpointing.interval-during-backlog =
0, checkpoint triggering is enabled if and only if all sources have
isBacklog=false".



> I am not sure what is the concern with having `flink-streaming-java` depend
> > on `flink-runtime`. Can you clarify the exact concern?
> >
>
> The concern here is that an API module should not depend on a runtime
> module. Currently, we have the "user codes -> flink-streaming-java ->
> flink-runtime" dependency chain, which makes binary compatibility
> impossible because any runtime changes can break the compatibility with a
> user jar (which bundles flink-streaming-java) compiled for an older
> version. Ideally, we want the runtime module to depend on the API module,
> rather than the other way around. This is one of the issues we are trying
> to resolve with the programmatic API refactor. However, the way we are
> trying to resolve it is to introduce another API module and gradually
> replace the current DataStream API / flink-streaming-java, which means
> flink-streaming-java will stay depending on flink-runtime for a while
> anyway. So the concern here is minor, only about we might need more effort
> when reworking this with the new API.
>

Thanks for the detailed explanation. Given that we plan to avoid having
flink-streaming-java depend on flink-runtime, I agree it is preferred to
avoid introducing more dependencies like this.

I have updated the FLIP to let RecordAttributes extend StreamElement.

Best,
Dong


> The rest of your replies make sense to me.
>
> Best,
>
> Xintong
>
>
>
> On Fri, Sep 15, 2023 at 10:05 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Xintong,
> >
> > Thanks for your comments! Please see my reply inline.
> >
> > On Thu, Sep 14, 2023 at 4:58 PM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> > > Sorry to join the discussion late.
> > >
> > > Overall, I think it's a good idea to support dynamically switching the
> > > operator algorithms between Streaming (optimized towards low latency +
> > > checkpointing supports) and Batch (optimized towards throughput). This
> is
> > > indeed a big and complex topic, and I really appreciate the previous
> > > discussions that narrow the scope of this FLIP down to only considering
> > > switching from Batch to Streaming as a first step.
> > >
> > > I have several questions.
> > >
> > > 1. The FLIP discusses various behaviors under 4 scenarios: streaming
> > mode,
> > > batch mode, mixed mode with checkpoint interval > 0, mixed mode with
> > > checkpoint interval = 0. IIUC, this is because many batch optimizations
> > > cannot be supported together with checkpointing. This justifies that in
> > > mixed mode with interval > 0, most behaviors are the same as in
> streaming
> > > mode. However, mixed mode with checkpoint interval = 0 does not always
> > > necessarily mean we should apply such optimization. It is possible that
> > in
> > > some cases (likely with small data amounts) the cost of such
> > optimizations
> > > are higher than the benefit. Therefore, I'd suggest decoupling the
> > concept
> > > of applying these optimizations (i.e., the batch execution phase in the
> > > mixed mode) from whether checkpointing is enabled or not. In
> particular,
> > > I'd suggest removing the scenario "mixed mode with
> > > e.c.interval-during-backlog > 0", changing the scenario "mixed mode
> with
> > > e.c.interval-during-backlog = 0" to simply "mixed mode", and say that
> can
> > > have different strategies for deciding whether to enable the mixed mode
> > and
> > > as the first step the strategy is to enable it when
> > > e.c.interval-during-backlog = 0.
> > >
> >
> > Thanks for the detailed explanation!
> >
> > I have updated the "Behavior changes when switching from batch mode to
> > stream mode" section with the following changes.
> >
> > 1) Remove the description of "mixed mode with interval-during-backlog >
> 0"
> > and add the statement saying that "after this FLIP, the behavior of Flink
> > runtime with execution.runtime-mode = streaming AND
> > execution.checkpointing.interval-during-backlog > 0, will be same as the
> > stream mode prior to this FLIP"
> >
> > 2) Add the statement saying that "Mixed mode refers to the behavior of
> > Flink runtime after this FLIP with execution.runtime-mode = streaming AND
> > execution.checkpointing.interval-during-backlog = 0".
> >
> > 3) Add the statement saying that "It is possible for mixed mode to be
> > slower than stream mode, particularly when there is only small amount of
> > input records and the overhead of buffering/sorting inputs out-weight its
> > benefit. This is similar to how the merge join might be slower than hash
> > join. This FLIP focuses on optimizing the Flink throughput when there is
> a
> > high number of input records. In the future, we might introduce more
> > strategies to turn on mix mode in a smart way to avoid performance
> > regression".
> >
> > Would this address your concern?

>
> >
> > >
> > > 2. According to the FLIP, before isBacklog = false, the timer service
> > only
> > > keeps timers for the current key. It also says upon the end of each
> key,
> > it
> > > fires timers of the key up to the last watermark. IIUC, that means not
> > all
> > > timers are guaranteed to be fired. It is possible that some timers are
> > left
> > > to be triggered after isBacklog switching to false. If the timer
> service
> > > only keeps timers for the current key, those not-fired timers may get
> > lost
> > > when switching to a new key.
> > >
> >
> > Thanks for catching this. You are right that all timers should be fired
> as
> > long as the corresponding firing condition (either processing-time or
> > event-time) is satisfied.
> >
> > I have updated the "Timer Service" part of the "Behavior changes when
> > switching from batch mode to stream mode" section accordingly. Can you
> see
> > if it addresses your concern?
> >
> >
> > >
> > > 3. Is it possible that some sources / operators in the job switch to
> > > isBacklog = false, while others are still isBacklog = true? In that
> case,
> > > what is the expected behavior for checkpointing and failover?
> > >
> >
> > Yes, it is possible. And in this case, Flink runtime will handle this
> > operator as if all the operator's inputs have isBacklog=false. In
> > particular, Flink runtime will not automatically sort inputs of this
> > operator.
> >
> > I added the following statement in the FLIP to clarify the behavior: "For
> > an operator with 2+ inputs, where some inputs have isBacklog=true and
> some
> > other inputs have isBacklog=false, Flink runtime will handle this
> operator
> > as if all its inputs have isBacklog=false".
> >
> > The expected behavior of checkpointing and failover depends on whether
> > there is any operator currently running in the job with all its inputs'
> > isBacklog=true. If there exists such an operator
> > and interval-during-backlog = 0, then checkpoint will be disabled and the
> > operator will have to failover in a way similar to batch mode.
> >
> >
> >
> > >
> > > 4. Do we require RecordAttributes to be properly handled by all
> > operators?
> > > Or do we consider it as hints that operators may benefit from looking
> > into
> > > it but should not run into any problems ignoring it? I'm asking
> because,
> > if
> > > they are required to be properly handled, we probably need a way to
> > enforce
> > > operators to deal with it. `processRecordAttributes(RecordAttributes)`
> > > might not be a good fit because we don't know whether the operator has
> > > looked into all necessary fields of `RecordAttributes`.
> > >
> >
> > As of this FLIP, we would not require RecordAttributes to be handled any
> > operator in order to achieve correctness. So it is more like a hint. More
> > specifically, the isBacklog attribute provides a hint for an operator to
> > optionally delay the processing of its inputs if doing so can improve its
> > throughput.
> >
> > I think it would be useful to avoid requiring operators to explicitly
> > handling attributes contained in RecordAttributes. This is because we
> want
> > the features added in this FLIP (and future FLIPs) to be backward
> > compatible without breaking the correctness of existing jobs.
> >
> > Suppose we really need to add a record attribute that should be
> explicitly
> > handled by every operator, I believe we can always find a way to enforce
> > this requirement (e.g. fail job compilation with proper error) in the
> > future. For example, we can add a method such as
> handleXXXRecodAttribute()
> > in the operator interface without default implementation.
> >
> >
> > >
> > > 5. I wonder if there's any strong reasons behind choosing
> `RuntimeEvent`
> > > over `StreamElement` for `RecordAttributes` to extend? My concern is
> > that,
> > > the current approach introduces one more dependency from
> > > `flink-streaming-java` (operators that uses `RecordAttributes`) to
> > > `flink-runtime` (where `RuntimeEvent` comes from), which seems to be
> > > unnecessary.
> > >
> >
> > There is no strong reason to choose `RuntimeEvent` over `StreamElement`.
> I
> > think the main (and minor) reason for doing so is the simplicity of
> > implementation. For example, we don't need to add methods such
> > as StreamElement#isRecordAttributes and StreamElement#asRecordAttributes.
> >
> > I am not sure what is the concern with having `flink-streaming-java`
> depend
> > on `flink-runtime`. Can you clarify the exact concern?
> >
> > In any case, I don't have a strong preference between `RuntimeEvent` over
> > `StreamElement`. We can update the FLIP to use `StreamElement` as long as
> > there is a well-defined non-trivial reason for making this choice.
> >
> >
> > > 6. The FLIP says it leverages state backend optimizations introduced in
> > > FLIP-325. Just for clarification, does this mean this FLIP is depending
> > on
> > > FLIP-325, and probably should not be voted / accepted until FLIP-325 is
> > > accepted?
> > >
> >
> > Yes, the proposed implementation of this FLIP depends on FLIP-325. We can
> > start voting thread for FLIP-327 after FLIP-325 is accepted. Maybe we can
> > continue to discuss this FLIP in the mean time (before FLIP-325 is
> > accepted).
> >
> > Thanks again for the very detailed and helpful review! Looking forward to
> > your follow-up comments.
> >
> > Best,
> > Dong
> >
> >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Fri, Sep 1, 2023 at 12:48 AM Jan Lukavský <je...@seznam.cz> wrote:
> > >
> > > > Hi,
> > > >
> > > > some keywords in this triggered my attention, so sorry for late
> jumping
> > > > in, but I'd like to comprehend the nature of the proposal.
> > > >
> > > > I'll try to summarize my understanding:
> > > >
> > > > The goal of the FLIP is to support automatic switching between
> > streaming
> > > > and batch processing, leveraging the fact that batch processing is
> more
> > > > computationally effective. This makes perfect sense.
> > > >
> > > > Looking at the streaming vs. batch semantics, switching from
> streaming
> > > > to batch means the following:
> > > >
> > > >   a) generally, watermarks are not propagated in batch, watermark
> moves
> > > > from -inf to +inf in one step, at the end of batch input, this might
> > > > (and probably will) skip many invocations of timers
> > > >
> > > >   b) grouping by key (and window) can be done efficiently, because it
> > > > can be done by sort-group and ideally parallelized by window (with
> some
> > > > caveats)
> > > >
> > > > The switch also has some conditions, namely:
> > > >
> > > >   i) batch mode does not do checkpoints, inputs must be accessible
> > > > repeatedly (forever)
> > > >
> > > >   ii) due to failures in batch mode, inputs might be reprocessed and
> > > > thus must be immutable or all sub-results computed in all branches of
> > > > the computation (even possibly unaffected by the failure) have to be
> > > > discarded and recomputed from scratch
> > > >
> > > > Obviously, in case of the switch from batch to streaming, the
> property
> > > > a) has to be modified so the watermark does not move to +inf, but to
> > > > min(streaming watermark). Giving these properties, it should be
> > possible
> > > > to exchange batch and streaming processing without any cooperation
> with
> > > > the application logic itself. Is my understanding correct?
> > > >
> > > > If so, there is still one open question to efficiency, though. The
> > > > streaming operator _might_ need sorting by timestamp (e.g. processing
> > > > time-series data, or even sequential data). In that case simply
> > > > switching streaming semantics to batch processing does not yield
> > > > efficient processing, because the operator still needs to buffer and
> > > > manually sort all the input data (batch data is always unordered). On
> > > > the other hand, the batch runner already does sorting (for grouping
> by
> > > > key), so adding additional sorting criterion is very cheap. In Apache
> > > > Beam, we introduced a property of a stateful PTransform (DoFn) called
> > > > @RequiresTimeSortedInput [1], which can then be implemented
> efficiently
> > > > by batch engines.
> > > >
> > > > Does the FLIP somehow work with conditions i) and ii)? I can imagine
> > for
> > > > instance that if data is read from say Kafka, then if backlog gets
> > > > sufficiently large, then even the batch processing can take
> substantial
> > > > time and if it fails after long processing, some of the original data
> > > > might be already rolled out from Kafka topic.
> > > >
> > > > In the FLIP there are some proposed changes to sources to emit
> metadata
> > > > about if the records come from backlog. What is the driving line of
> > > > thoughts why this is needed? In my point of view, streaming engines
> are
> > > > _always_ processing backlog, the only question is "how delayed are
> the
> > > > currently processed events after HEAD", or more specifically in this
> > > > case "how many elements can we expect to process if the source would
> > > > immediately stop receiving more data?". This should be configurable
> > > > using simple option defining the difference between current
> > > > processing-time (JM) and watermark of the source, or am I missing
> > > > something?
> > > >
> > > > Thanks for clarification and all the best,
> > > >
> > > >   Jan
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> > > >
> > > > On 8/31/23 13:17, Xuannan Su wrote:
> > > > > Hi all,
> > > > >
> > > > > I would like to share some updates on FLIP-327. Dong and I have
> had a
> > > > > series of discussions and have made several refinements to the
> FLIP.
> > > > >
> > > > > The major change to the FLIP is to allow the input of the one-input
> > > > > operator to be automatically sorted during backlog processing. When
> > > > > combined with the state backend optimization introduced in FLIP-325
> > > [1],
> > > > > all the keyed single-input operators can achieve similar
> performance
> > as
> > > > in
> > > > > batch mode during backlog processing without any code change to the
> > > > > operator. We also implemented a POC[2] and conducted benchmark[3]
> > using
> > > > the
> > > > > KeyedStream#reduce operation. The benchmark results demonstrate the
> > > > > performance gains that this FLIP can offer.
> > > > >
> > > > > I am looking forward to any comments or feedback you may have on
> this
> > > > FLIP.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
> > > > > [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
> > > > > [3]
> > > > >
> > > >
> > >
> >
> https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java
> > > > >
> > > > >
> > > > >
> > > > >> On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> wrote:
> > > > >>
> > > > >> Hi Piotr,
> > > > >>
> > > > >> Thanks for the explanation.
> > > > >>
> > > > >> To recap our offline discussion, there is a concern regarding the
> > > > >> capability to dynamically switch between stream and batch modes.
> > This
> > > > >> concern is around unforeseen behaviors such as bugs or performance
> > > > >> regressions, which we might not yet be aware of yet. The reason
> for
> > > this
> > > > >> concern is that this feature involves a fundamental impact on the
> > > Flink
> > > > >> runtime's behavior.
> > > > >>
> > > > >> Due to the above concern, I agree it is reasonable to annotate
> > related
> > > > > APIs
> > > > >> as experimental. This step would provide us with the flexibility
> to
> > > > modify
> > > > >> these APIs if issues arise in the future. This annotation also
> > serves
> > > > as a
> > > > >> note to users that this functionality might not perform well as
> > > > expected.
> > > > >>
> > > > >> Though I believe that we can ensure the reliability of this
> feature
> > > > > through
> > > > >> good design and code reviews, comprehensive unit tests, and
> thorough
> > > > >> integration testing, I agree that it is reasonable to be extra
> > > cautious
> > > > in
> > > > >> this case. Also, it should be OK to delay making these APIs as
> > > > >> non-experimental by 1-2 releases.
> > > > >>
> > > > >> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in
> > these
> > > > docs
> > > > >> as experimental. Please let me know if you think any other API
> > should
> > > > also
> > > > >> be marked as experimental.
> > > > >>
> > > > >> Thanks!
> > > > >> Dong
> > > > >>
> > > > >> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Dong,
> > > > >>>
> > > > >>> Operators API is unfortunately also our public facing API and I
> > mean
> > > > the
> > > > >>> APIs that we will add there should also be marked `@Experimental`
> > > IMO.
> > > > >>>
> > > > >>> The config options should also be marked as experimental (both
> > > > >>> annotated @Experimental and noted the same thing in the docs,
> > > > >>> if @Experimental annotation is not automatically mentioned in the
> > > > docs).
> > > > >>>
> > > > >>>> Alternatively, how about we add a doc for
> > > > >>> checkpointing.interval-during-backlog explaining its
> impact/concern
> > > as
> > > > >>> discussed above?
> > > > >>>
> > > > >>> We should do this independently from marking the APIs/config
> > options
> > > as
> > > > >>> `@Experimental`
> > > > >>>
> > > > >>> Best,
> > > > >>> Piotrek
> > > > >>>
> > > > >>> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com>
> > napisał(a):
> > > > >>>
> > > > >>>> Hi Piotr,
> > > > >>>>
> > > > >>>> Thanks for the reply!
> > > > >>>>
> > > > >>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com
> > > > >>>>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Hi,
> > > > >>>>>
> > > > >>>>> Sorry for the long delay in responding!
> > > > >>>>>
> > > > >>>>>> Given that it is an optional feature that can be
> > > > >>>>>> turned off by users, it might be OK to just let users try it
> out
> > > and
> > > > >>> we
> > > > >>>>> can
> > > > >>>>>> fix performance issues once we detect any of them. What do you
> > > > think?
> > > > >>>>> I think it's fine. It would be best to mark this feature as
> > > > >>> experimental,
> > > > >>>>> and
> > > > >>>>> we say that the config keys or the default values might change
> in
> > > the
> > > > >>>>> future.
> > > > >>>>>
> > > > >>>> In general I agree we can mark APIs that determine "whether to
> > > enable
> > > > >>>> dynamic switching between stream/batch mode" as experimental.
> > > > >>>>
> > > > >>>> However, I am not sure we have such an API yet. The APIs added
> in
> > > this
> > > > >>> FLIP
> > > > >>>> are intended to be used by operator developers rather than end
> > > users.
> > > > > End
> > > > >>>> users can enable this capability by setting
> > > > >>>> execution.checkpointing.interval-during-backlog = Long.MAX and
> > uses
> > > a
> > > > >>>> source which might implicitly set backlog statu (e.g.
> > HybridSource).
> > > > So
> > > > >>>> execution.checkpointing.interval-during-backlog is the only
> > > > user-facing
> > > > >>>> APIs that can always control whether this feature can be used.
> > > > >>>>
> > > > >>>> However, execution.checkpointing.interval-during-backlog itself
> is
> > > not
> > > > >>> tied
> > > > >>>> to FLIP-327.
> > > > >>>>
> > > > >>>> Do you mean we should set checkpointing.interval-during-backlog
> as
> > > > >>>> experimental? Alternatively, how about we add a doc for
> > > > >>>> checkpointing.interval-during-backlog explaining its
> > impact/concern
> > > as
> > > > >>>> discussed above?
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Dong
> > > > >>>>
> > > > >>>>
> > > > >>>>>> Maybe we can revisit the need for such a config when we
> > > > >>>> introduce/discuss
> > > > >>>>>> the capability to switch backlog from false to true in the
> > future.
> > > > >>> What
> > > > >>>>> do
> > > > >>>>>> you think?
> > > > >>>>> Sure, we can do that.
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Piotrek
> > > > >>>>>
> > > > >>>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com>
> > > > napisał(a):
> > > > >>>>>
> > > > >>>>>> Hi Piotr,
> > > > >>>>>>
> > > > >>>>>> Thanks a lot for the explanation. Please see my reply inline.
> > > > >>>>>>
> > > > >>>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > > > >>>>> piotr.nowoj...@gmail.com>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Dong,
> > > > >>>>>>>
> > > > >>>>>>> Thanks a lot for the answers. I can now only briefly answer
> > your
> > > > >>> last
> > > > >>>>>>> email.
> > > > >>>>>>>
> > > > >>>>>>>> It is possible that spilling to disks might cause larger
> > > > >>> overhead.
> > > > >>>>> IMO
> > > > >>>>>> it
> > > > >>>>>>>> is an orthogonal issue already existing in Flink. This is
> > > > >>> because a
> > > > >>>>>> Flink
> > > > >>>>>>>> job running batch mode might also be slower than its
> > throughput
> > > > >>> in
> > > > >>>>>> stream
> > > > >>>>>>>> mode due to the same reason.
> > > > >>>>>>> Yes, I know, but the thing that worries me is that previously
> > > only
> > > > >>> a
> > > > >>>>> user
> > > > >>>>>>> alone
> > > > >>>>>>> could decide whether to use batch mode or streaming, and in
> > > > >>> practice
> > > > >>>>> one
> > > > >>>>>>> user would rarely (if ever) use both for the same
> > > > >>> problem/job/query.
> > > > >>>> If
> > > > >>>>>> his
> > > > >>>>>>> intention was to eventually process live data, he was using
> > > > >>> streaming
> > > > >>>>>> even
> > > > >>>>>>> if there was a large backlog at the start (apart of some very
> > few
> > > > >>>> very
> > > > >>>>>>> power
> > > > >>>>>>> users).
> > > > >>>>>>>
> > > > >>>>>>> With this change, we want to introduce a mode that would be
> > > > >>> switching
> > > > >>>>>> back
> > > > >>>>>>> and forth between streaming and "batch in streaming"
> > > automatically.
> > > > >>>> So
> > > > >>>>> a
> > > > >>>>>>> potential performance regression would be much more visible
> and
> > > > >>>> painful
> > > > >>>>>>> at the same time. If batch query runs slower then it could,
> > it's
> > > > >>> kind
> > > > >>>>> of
> > > > >>>>>>> fine as
> > > > >>>>>>> it will end at some point. If streaming query during large
> back
> > > > >>>>> pressure
> > > > >>>>>>> maybe
> > > > >>>>>>> temporary load spike switches to batch processing, that's a
> > > bigger
> > > > >>>>> deal.
> > > > >>>>>>> Especially if batch processing mode will not be able to
> > actually
> > > > >>> even
> > > > >>>>>>> handle
> > > > >>>>>>> the normal load, after the load spike. In that case, the job
> > > could
> > > > >>>>> never
> > > > >>>>>>> recover
> > > > >>>>>>> from the backpressure/backlog mode.
> > > > >>>>>>>
> > > > >>>>>> I understand you are concerned with the risk of performance
> > > > >>> regression
> > > > >>>>>> introduced due to switching to batch mode.
> > > > >>>>>>
> > > > >>>>>> After thinking about this more, I think this existing proposal
> > > meets
> > > > >>>> the
> > > > >>>>>> minimum requirement of "not introducing regression for
> existing
> > > > >>> jobs".
> > > > >>>>> The
> > > > >>>>>> reason is that even if batch mode can be slower than stream
> mode
> > > for
> > > > >>>> some
> > > > >>>>>> operators in some cases, this is an optional feature that will
> > > only
> > > > >>> be
> > > > >>>>>> enabled if a user explicitly overrides the newly introduced
> > config
> > > > to
> > > > >>>>>> non-default values. Existing jobs that simply upgrade their
> > Flink
> > > > >>>> library
> > > > >>>>>> version will not suffer any performance regression.
> > > > >>>>>>
> > > > >>>>>> More specifically, in order to switch to batch mode, users
> will
> > > need
> > > > >>> to
> > > > >>>>>> explicitly set execution.checkpointing.interval-during-backlog
> > to
> > > 0.
> > > > >>>> And
> > > > >>>>>> users can always explicitly update
> > > > >>>>>> execution.checkpointing.interval-during-backlog to turn off
> the
> > > > batch
> > > > >>>>> mode
> > > > >>>>>> if that incurs any performance issue.
> > > > >>>>>>
> > > > >>>>>> As far as I can tell, for all practical workloads we see in
> > > > >>> production
> > > > >>>>>> jobs, batch mode is always faster (w.r.t. throughput) than
> > stream
> > > > >>> mode
> > > > >>>>> when
> > > > >>>>>> there is a high backlog of incoming records. Though it is
> still
> > > > >>>>>> theoretically possible, it should be very rare (if any) for
> > batch
> > > > >>> mode
> > > > >>>> to
> > > > >>>>>> be slower in practice. Given that it is an optional feature
> that
> > > can
> > > > >>> be
> > > > >>>>>> turned off by users, it might be OK to just let users try it
> out
> > > and
> > > > >>> we
> > > > >>>>> can
> > > > >>>>>> fix performance issues once we detect any of them. What do you
> > > > think?
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>>> execution.backlog.use-full-batch-mode-on-start (default
> false)
> > > > >>>>>>> ops sorry, it was supposed to be sth like:
> > > > >>>>>>>
> > > > >>>>>>> execution.backlog.use-batch-mode-only-on-start (default
> false)
> > > > >>>>>>>
> > > > >>>>>>> That option would disallow switching from streaming to batch.
> > > Batch
> > > > >>>>> mode
> > > > >>>>>>> would be allowed only to get rid of the initial, present on
> > > > >>> start-up
> > > > >>>>>>> backlog.
> > > > >>>>>>>
> > > > >>>>>>> Would allow us to safely experiment with switching from
> > streaming
> > > > >>> to
> > > > >>>>>> batch
> > > > >>>>>>> and I would be actually more fine in enabling "using batch
> mode
> > > on
> > > > >>>>> start"
> > > > >>>>>>> by default, until we gain confidence and feedback that
> > switching
> > > > >>>> back &
> > > > >>>>>>> forth
> > > > >>>>>>> is working as expected.
> > > > >>>>>>>
> > > > >>>>>> Now I understand what you are suggesting. I agree that it is
> > > > >>> necessary
> > > > >>>>> for
> > > > >>>>>> users to be able to disallow switching from streaming to
> batch.
> > > > >>>>>>
> > > > >>>>>> I am not sure it is necessary to introduce an extra config
> just
> > > for
> > > > >>>> this
> > > > >>>>>> purpose. The reason is that we don't have any strategy that
> > > switches
> > > > >>>>>> backlog status from false to true yet. And when we have such
> > > > strategy
> > > > >>>>> (e.g.
> > > > >>>>>> FLIP-328) in the future, it is very likely that we will
> > introduce
> > > > >>> extra
> > > > >>>>>> config(s) for users to explicitly turn on such a feature. That
> > > means
> > > > >>>> user
> > > > >>>>>> should be able to turn off this feature even if we don't have
> > > > >>> something
> > > > >>>>>> like execution.backlog.use-batch-mode-only-on-start.
> > > > >>>>>>
> > > > >>>>>> Maybe we can revisit the need for such a config when we
> > > > >>>> introduce/discuss
> > > > >>>>>> the capability to switch backlog from false to true in the
> > future.
> > > > >>> What
> > > > >>>>> do
> > > > >>>>>> you think?
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>>>> Or we could limit the scope of this FLIP to only support
> > > > >>> starting
> > > > >>>>> with
> > > > >>>>>>>>> batch mode and switching only once to
> > > > >>>>>>>>> streaming, and design a follow up with switching back and
> > > forth?
> > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP
> > into
> > > > >>>> two
> > > > >>>>>>> FLIPs
> > > > >>>>>>>> so that we can make incremental progress.
> > > > >>>>>>> Great, let's do that. In a follow up FLIP we could restart
> the
> > > > >>>>> discussion
> > > > >>>>>>> about
> > > > >>>>>>> switching back and forth.
> > > > >>>>>>>
> > > > >>>>>> Cool, I added the following statement to the motivation
> section.
> > > > >>>>>>
> > > > >>>>>> "NOTE: this FLIP focuses only on the capability to switch from
> > > batch
> > > > >>> to
> > > > >>>>>> stream mode. If there is any extra API needed to support
> > switching
> > > > >>> from
> > > > >>>>>> stream to batch mode, we will discuss them in a follow-up
> FLIP."
> > > > >>>>>>
> > > > >>>>>> I am looking forward to reading your follow-up thoughts!
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Dong
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>> Piotrek
> > > > >>>>>>>
> > > > >>>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com>
> > > > >>> napisał(a):
> > > > >>>>>>>> Hi Piotr,
> > > > >>>>>>>>
> > > > >>>>>>>> Thank you for the very detailed comments! Please see my
> reply
> > > > >>>> inline.
> > > > >>>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
> > > > >>>>>>> piotr.nowoj...@gmail.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Dong,
> > > > >>>>>>>>>
> > > > >>>>>>>>> I have a couple of follow up questions about switching back
> > and
> > > > >>>>> forth
> > > > >>>>>>>>> between streaming and batching mode.
> > > > >>>>>>>>> Especially around shuffle/watermark strategy, and keyed
> state
> > > > >>>>>> backend.
> > > > >>>>>>>>> First of all, it might not always be beneficial to switch
> > into
> > > > >>>> the
> > > > >>>>>>> batch
> > > > >>>>>>>>> modes:
> > > > >>>>>>>>> - Shuffle strategy
> > > > >>>>>>>>>     - Is sorting going to be purely in-memory? If not,
> > > > >>> obviously
> > > > >>>>>>> spilling
> > > > >>>>>>>>> to disks might cause larger overheads
> > > > >>>>>>>>>        compared to not sorting the records.
> > > > >>>>>>>>>
> > > > >>>>>>>> Sorting might require spilling data to disk depending on the
> > > > >>> input
> > > > >>>>>> size.
> > > > >>>>>>>> The behavior of sorting w.r.t. memory/disk is expected to be
> > > > >>>> exactly
> > > > >>>>>> the
> > > > >>>>>>>> same as the behavior of input sorting automatically
> performed
> > by
> > > > >>>>> Flink
> > > > >>>>>>>> runtime in batch mode for keyed inputs.
> > > > >>>>>>>>
> > > > >>>>>>>> More specifically, ExternalSorter
> > > > >>>>>>>> <
> > > > >>>>>>>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> > > > >>>>>>>> is
> > > > >>>>>>>> currently used to sort keyed inputs in batch mode. It is
> > > > >>>>> automatically
> > > > >>>>>>> used
> > > > >>>>>>>> by Flink runtime in OneInputStreamTask (here
> > > > >>>>>>>> <
> > > > >>>>>>>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> > > > >>>>>>>>> )
> > > > >>>>>>>> and in MultiInputSortingDataInput (here
> > > > >>>>>>>> <
> > > > >>>>>>>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> > > > >>>>>>>>> ).
> > > > >>>>>>>> We plan to re-use the same code/mechanism to do sorting.
> > > > >>>>>>>>
> > > > >>>>>>>> It is possible that spilling to disks might cause larger
> > > > >>> overhead.
> > > > >>>>> IMO
> > > > >>>>>> it
> > > > >>>>>>>> is an orthogonal issue already existing in Flink. This is
> > > > >>> because a
> > > > >>>>>> Flink
> > > > >>>>>>>> job running batch mode might also be slower than its
> > throughput
> > > > >>> in
> > > > >>>>>> stream
> > > > >>>>>>>> mode due to the same reason. However, even though it is
> > possible
> > > > >>> in
> > > > >>>>>>> theory,
> > > > >>>>>>>> I expect that in practice the throughput of using sorting +
> > > > >>>>>>>> BatchExecutionKeyedStateBackend should be much higher than
> > using
> > > > >>>>> other
> > > > >>>>>>>> keyed statebackends when the amount of data is large. As a
> > > matter
> > > > >>>> of
> > > > >>>>>>> fact,
> > > > >>>>>>>> we have not heard of complaints of such performance
> regression
> > > > >>>> issues
> > > > >>>>>> in
> > > > >>>>>>>> batch mode.
> > > > >>>>>>>>
> > > > >>>>>>>> The primary goal of this FLIP is to allow the operator to
> run
> > at
> > > > >>>> the
> > > > >>>>>> same
> > > > >>>>>>>> throughput (in stream mode when there is backlog) as it can
> > > > >>>> currently
> > > > >>>>>> do
> > > > >>>>>>> in
> > > > >>>>>>>> batch mode. And this goal is not affected by the disk
> overhead
> > > > >>>> issue
> > > > >>>>>>>> mentioned above.
> > > > >>>>>>>>
> > > > >>>>>>>> I am thinking maybe we can treat it as an orthogonal
> > performance
> > > > >>>>>>>> optimization problem instead of solving this problem in this
> > > > >>> FLIP?
> > > > >>>>>>>>     - If it will be at least partially in-memory, does Flink
> > > have
> > > > >>>>> some
> > > > >>>>>>>>> mechanism to reserve optional memory that
> > > > >>>>>>>>>       can be revoked if a new operator starts up? Can this
> > > > >>> memory
> > > > >>>>> be
> > > > >>>>>>>>> redistributed? Ideally we should use as
> > > > >>>>>>>>>       much as possible of the available memory to avoid
> > > > >>> spilling
> > > > >>>>>> costs,
> > > > >>>>>>>> but
> > > > >>>>>>>>> also being able to revoke that memory
> > > > >>>>>>>>>
> > > > >>>>>>>> This FLIP does not support dynamically
> revoking/redistribuitng
> > > > >>>>> managed
> > > > >>>>>>>> memory used by the ExternalSorter.
> > > > >>>>>>>>
> > > > >>>>>>>> For operators with isInternalSorterSupported = true, we will
> > > > >>>> allocate
> > > > >>>>>> to
> > > > >>>>>>>> this operator execution.sorted-inputs.memory
> > > > >>>>>>>> <
> > > > >>>>>>>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
> > > > >>>>>>>> amount of managed memory. This is the same as how Flink
> > > allocates
> > > > >>>>>> managed
> > > > >>>>>>>> memory to an operator when this operator has keyed inputs in
> > > > >>> batch
> > > > >>>>>> mode.
> > > > >>>>>>>> Note that this FLIP intends to support operators to sort
> > inputs
> > > > >>>>>> whenever
> > > > >>>>>>>> there is backlog. And there is currently no way for an
> > operator
> > > > >>> to
> > > > >>>>> know
> > > > >>>>>>> in
> > > > >>>>>>>> advance whether there will be no backlog after a given time.
> > So
> > > > >>> it
> > > > >>>>>> seems
> > > > >>>>>>>> simpler to just keep managed memory for such an operator
> > > > >>> throughout
> > > > >>>>> the
> > > > >>>>>>>> lifecycle of this operator, for now.
> > > > >>>>>>>>
> > > > >>>>>>>> Besides, it seems that the lack of ability to dynamically
> > > > >>>>>>>> revoke/redistribute un-used managed memory is an existing
> > issue
> > > > >>> in
> > > > >>>>>> Flink.
> > > > >>>>>>>> For example, we might have two operators sharing the same
> slot
> > > > >>> and
> > > > >>>>>> these
> > > > >>>>>>>> two operators both use managed memory (e.g. to sort inputs).
> > > > >>> There
> > > > >>>> is
> > > > >>>>>>>> currently no way for one operator to re-use the memory not
> > used
> > > > >>> by
> > > > >>>>> the
> > > > >>>>>>>> other operator.
> > > > >>>>>>>>
> > > > >>>>>>>> Therefore, I think we can treat this as an orthogonal
> > > performance
> > > > >>>>>>>> optimization problem which can be addressed separately. What
> > do
> > > > >>> you
> > > > >>>>>>> think?
> > > > >>>>>>>>
> > > > >>>>>>>>>     - Sometimes sorting, even if we have memory to do that,
> > > > >>> might
> > > > >>>>> be
> > > > >>>>>> an
> > > > >>>>>>>>> unnecessary overhead.
> > > > >>>>>>>>> - Watermarks
> > > > >>>>>>>>>     - Is holding back watermarks always good? If we have
> tons
> > > > >>> of
> > > > >>>>> data
> > > > >>>>>>>>> buffered/sorted and waiting to be processed
> > > > >>>>>>>>>        with multiple windows per key and many different
> keys.
> > > > >>>> When
> > > > >>>>> we
> > > > >>>>>>>>> switch back to `isBacklog=false` we
> > > > >>>>>>>>>        first process all of that data before processing
> > > > >>>> watermarks,
> > > > >>>>>> for
> > > > >>>>>>>>> operators that are not using sorted input the
> > > > >>>>>>>>>        state size can explode significantly causing lots of
> > > > >>>>> problems.
> > > > >>>>>>>> Even
> > > > >>>>>>>>> for those that can use sorting, switching to
> > > > >>>>>>>>>        sorting or BatchExecutionKeyedStateBackend is not
> > > > >>> always a
> > > > >>>>>> good
> > > > >>>>>>>>> idea, but keeping RocksDB also can be
> > > > >>>>>>>>>        risky.
> > > > >>>>>>>>>
> > > > >>>>>>>> With the current FLIP, the proposal is to use a sorter only
> > when
> > > > >>>> the
> > > > >>>>>>> inputs
> > > > >>>>>>>> have keys. According to this practice, operators which are
> not
> > > > >>>> using
> > > > >>>>>>>> sorting should have un-keyed inputs. I believe such an
> > operator
> > > > >>>> will
> > > > >>>>>> not
> > > > >>>>>>>> even use a keyed state backend. Maybe I missed some
> use-case.
> > > Can
> > > > >>>> you
> > > > >>>>>>>> provide a use-case where we will have an operator with
> > un-keyed
> > > > >>>>> inputs
> > > > >>>>>>>> whose state size can explode due to we holding back
> > watermarks?
> > > > >>>>>>>>
> > > > >>>>>>>> For operators with keyed inputs that use sorting, I suppose
> it
> > > is
> > > > >>>>>>> possible
> > > > >>>>>>>> that sorting + BatchExecutionKeyedStateBackend can be worse
> > than
> > > > >>>>> using
> > > > >>>>>>>> RocksDB. But I believe this is very very rare (if possible)
> in
> > > > >>>> almost
> > > > >>>>>>>> practical usage of Flink.
> > > > >>>>>>>>
> > > > >>>>>>>> Take one step back, if this indeed cause regression for a
> real
> > > > >>>>>> use-case,
> > > > >>>>>>>> user can set execution.checkpointing.interval-during-backlog
> > to
> > > > >>>>>> anything
> > > > >>>>>>>> other than 0 so that this FLIP will not use
> > > > >>>>>>>> sorter + BatchExecutionKeyedStateBackend even even when
> there
> > is
> > > > >>>>>> backlog.
> > > > >>>>>>>> I would hope we can find a way to automatically determine
> > > whether
> > > > >>>>> using
> > > > >>>>>>>> sorting + BatchExecutionKeyedStateBackend can be better or
> > worse
> > > > >>>> than
> > > > >>>>>>> using
> > > > >>>>>>>> RocksDB alone. But I could not find a good and reliable way
> to
> > > do
> > > > >>>>> this.
> > > > >>>>>>>> Maybe we can update Flink to do this when we find a good way
> > to
> > > > >>> do
> > > > >>>>> this
> > > > >>>>>>> in
> > > > >>>>>>>> the future?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> - Keyed state backend
> > > > >>>>>>>>>     - I think you haven't described what happens during
> > > > >>> switching
> > > > >>>>>> from
> > > > >>>>>>>>> streaming to backlog processing.
> > > > >>>>>>>>>
> > > > >>>>>>>> Good point. This indeed needs to be described. I added a
> TODO
> > in
> > > > >>>> the
> > > > >>>>>>>> "Behavior changes ..." section to describe what happens when
> > > > >>>>> isBacklog
> > > > >>>>>>>> switches from false to true, for all
> > > > >>>>> watermark/checkpoint/statebackend
> > > > >>>>>>> etc.
> > > > >>>>>>>> Let me explain this for the state backend here for now. I
> will
> > > > >>>> update
> > > > >>>>>>> FLIP
> > > > >>>>>>>> later.
> > > > >>>>>>>>
> > > > >>>>>>>> When isBacklog switches from false to true, operator with
> > keyed
> > > > >>>>> inputs
> > > > >>>>>>> can
> > > > >>>>>>>> optionally (as determined by its implementation) starts to
> use
> > > > >>>>> internal
> > > > >>>>>>>> sorter to sort inputs by key, without processing inputs or
> > > > >>> updating
> > > > >>>>>>>> statebackend, until it receives end-of-inputs or isBacklog
> is
> > > > >>>>> switched
> > > > >>>>>> to
> > > > >>>>>>>> false again.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>>     - Switch can be an unnecessary overhead.
> > > > >>>>>>>>
> > > > >>>>>>>> I agree it can cause unnecessary overhead, particularly when
> > > > >>>>> isBacklog
> > > > >>>>>>>> switches back and forth frequently. Whether or not this is
> > > > >>>>> unnecessary
> > > > >>>>>>>> likely depends on the duration/throughput of the backlog
> phase
> > > as
> > > > >>>>> well
> > > > >>>>>> as
> > > > >>>>>>>> the specific computation logic of the operator. I am not
> sure
> > > > >>> there
> > > > >>>>> is
> > > > >>>>>> a
> > > > >>>>>>>> good way for Flink to determine in advance whether switching
> > is
> > > > >>>>>>>> unnecessary.
> > > > >>>>>>>>
> > > > >>>>>>>> Note that for the existing use-case where we expect to
> change
> > > > >>>>> isBacklog
> > > > >>>>>>> to
> > > > >>>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source watermark
> > lag
> > > > >>>> being
> > > > >>>>>> too
> > > > >>>>>>>> high), we don't expect the watermark to switch back and
> force
> > > > >>>>>> frequently.
> > > > >>>>>>>> And user can disable this switch by setting
> > > > >>>>>>>> execution.checkpointing.interval-during-backlog to anything
> > > other
> > > > >>>>> than
> > > > >>>>>> 0.
> > > > >>>>>>>> Therefore, I am wondering if we can also view this as a
> > > > >>> performance
> > > > >>>>>>>> optimization opportunity for extra use-cases in the future,
> > > > >>> rather
> > > > >>>>>> than a
> > > > >>>>>>>> blocking issue of this FLIP for the MVP use-case (e.g.
> > snapshot
> > > > >>>> phase
> > > > >>>>>> for
> > > > >>>>>>>> any CDC source, Kafka watermark lag).
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> At the same time, in your current proposal, for
> > > > >>>>>>>>> `execution.checkpointing.interval-during-backlog > 0` we
> > won't
> > > > >>>>>>>>> switch to "batch" mode at all. That's a bit of shame, I
> don't
> > > > >>>>>>> understand
> > > > >>>>>>>>> why those two things should be coupled
> > > > >>>>>>>>> together?
> > > > >>>>>>>>>
> > > > >>>>>>>> We can in general classify optimizations as those that are
> > > > >>>> compatible
> > > > >>>>>>> with
> > > > >>>>>>>> checkpointing, and those that are not compatible with
> > > > >>>> checkpointing.
> > > > >>>>>> For
> > > > >>>>>>>> example, input sorting is currently not compatible with
> > > > >>>>> checkpointing.
> > > > >>>>>>> And
> > > > >>>>>>>> buffering input records to reduce state backend overhead
> (and
> > > > >>>>> probably
> > > > >>>>>>>> columnar processing for mini-batch in the future) is
> > compatible
> > > > >>>> with
> > > > >>>>>>>> checkpointing.
> > > > >>>>>>>>
> > > > >>>>>>>> The primary of FLIP-327 is to support optimizations not
> > > > >>> compatible
> > > > >>>>> with
> > > > >>>>>>>> checkpointing. If
> > > > >>> execution.checkpointing.interval-during-backlog >
> > > > >>>>> 0,
> > > > >>>>>>>> which means that user intends to still do checkpointing even
> > > when
> > > > >>>>> there
> > > > >>>>>>> is
> > > > >>>>>>>> backog, then we will not be able to support such
> > optimizations.
> > > > >>>>>>>>
> > > > >>>>>>>> For optimizations that are compatible with checkpointing, we
> > can
> > > > >>> do
> > > > >>>>>> this
> > > > >>>>>>>> even when the operator does not run in "batch mode". There
> are
> > > > >>>> extra
> > > > >>>>>>>> problems to solve in order to achieve this optimization,
> such
> > as
> > > > >>>>>>> supporting
> > > > >>>>>>>> unaligned checkpointing without prolonging its sync phase. I
> > > plan
> > > > >>>> to
> > > > >>>>>>>> explain how this can be done in FLIP-325.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> All in all, shouldn't we aim for some more clever process
> of
> > > > >>>>>> switching
> > > > >>>>>>>> back
> > > > >>>>>>>>> and forth between streaming/batch modes
> > > > >>>>>>>>> for watermark strategy/state backend/sorting based on some
> > > > >>>> metrics?
> > > > >>>>>>>> Trying
> > > > >>>>>>>>> to either predict if switching might help,
> > > > >>>>>>>>> or trying to estimate if the last switch was beneficial?
> > Maybe
> > > > >>>>>>> something
> > > > >>>>>>>>> along the lines:
> > > > >>>>>>>>> - sort only in memory and during sorting count the number
> of
> > > > >>>>> distinct
> > > > >>>>>>>> keys
> > > > >>>>>>>>> (NDK)
> > > > >>>>>>>>>     - maybe allow for spilling if so far in memory we have
> > NDK
> > > > >>> *
> > > > >>>> 5
> > > > >>>>>> =
> > > > >>>>>>>>> #records
> > > > >>>>>>>>> - do not allow to buffer records above a certain threshold,
> > as
> > > > >>>>>>> otherwise
> > > > >>>>>>>>> checkpointing can explode
> > > > >>>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if NDK
> * 2
> > > > >>>> =
> > > > >>>>>>> #records
> > > > >>>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records
> > > > >>>>>>>>>
> > > > >>>>>>>>> Or even maybe for starters something even simpler and then
> > test
> > > > >>>> out
> > > > >>>>>>>>> something more fancy as a follow up?
> > > > >>>>>>>>>
> > > > >>>>>>>> I agree it is worth investigating these ideas to further
> > > optimize
> > > > >>>> the
> > > > >>>>>>>> performance during backlog.
> > > > >>>>>>>>
> > > > >>>>>>>> I just think these can be done independently after this
> FLIP.
> > > The
> > > > >>>>> focus
> > > > >>>>>>> of
> > > > >>>>>>>> this FLIP is to re-use in stream mode the same optimization
> > > which
> > > > >>>> we
> > > > >>>>>>>> already use in batch mode, rather than inventing or
> improving
> > > the
> > > > >>>>>>>> performance of these existing optimizations.
> > > > >>>>>>>>
> > > > >>>>>>>> Given that there are already a lot of new mechanism/features
> > to
> > > > >>>>> discuss
> > > > >>>>>>> and
> > > > >>>>>>>> address in this FLIP, I am hoping we can limit the scope of
> > this
> > > > >>>> FLIP
> > > > >>>>>> to
> > > > >>>>>>>> re-use the existing optimization, and do these extra
> > > optimization
> > > > >>>>>>>> opportunities as future work.
> > > > >>>>>>>>
> > > > >>>>>>>> What do you think?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> At the same time,
> > > > >>>>> `execution.checkpointing.interval-during-backlog=0`
> > > > >>>>>>>> seems
> > > > >>>>>>>>> a weird setting to me, that I would
> > > > >>>>>>>>> not feel safe recommending to anyone. If processing of a
> > > > >>> backlog
> > > > >>>>>> takes
> > > > >>>>>>> a
> > > > >>>>>>>>> long time, a job might stop making
> > > > >>>>>>>>> any progress due to some random failures. Especially
> > dangerous
> > > > >>>> if a
> > > > >>>>>> job
> > > > >>>>>>>> switches from streaming mode back to
> > > > >>>>>>>>> backlog processing due to some reasons, as that could
> happen
> > > > >>>> months
> > > > >>>>>>> after
> > > > >>>>>>>>> someone started a job with this
> > > > >>>>>>>>> strange setting. So should we even have it? I would simply
> > > > >>>> disallow
> > > > >>>>>>> it. I
> > > > >>>>>>>> Good point. I do agree we need to further work to improve
> the
> > > > >>>>> failover
> > > > >>>>>>>> performance in case any task fails.
> > > > >>>>>>>>
> > > > >>>>>>>> As of the current FLIP, if any task fails during backlog and
> > > > >>>>>>>> execution.checkpointing.interval-during-backlog = 0, we will
> > > need
> > > > >>>> to
> > > > >>>>>>>> restart all operators to the last checkpointed state and
> > > continue
> > > > >>>>>>>> processing backlog. And this can be a lot of rollback since
> > > there
> > > > >>>> is
> > > > >>>>> no
> > > > >>>>>>>> checkpoint during backlog. And this can also be worse than
> > batch
> > > > >>>>> since
> > > > >>>>>>> this
> > > > >>>>>>>> FLIP currently does not support exporting/saving records to
> > > local
> > > > >>>>> disk
> > > > >>>>>>> (or
> > > > >>>>>>>> shuffle service) so that a failed task can re-consume the
> > > records
> > > > >>>>> from
> > > > >>>>>>> the
> > > > >>>>>>>> upstream task (or shuffle service) in the same way as how
> > Flink
> > > > >>>>>> failover
> > > > >>>>>>> a
> > > > >>>>>>>> task in batch mode.
> > > > >>>>>>>>
> > > > >>>>>>>> I think we can extend this FLIP to solve this problem so
> that
> > it
> > > > >>>> can
> > > > >>>>>> have
> > > > >>>>>>>> at least the same behavior/performance as batch-mode job.
> The
> > > > >>> idea
> > > > >>>> is
> > > > >>>>>> to
> > > > >>>>>>>> also follow what batch mode does. For example, we can
> trigger
> > a
> > > > >>>>>>> checkpoint
> > > > >>>>>>>> when isBacklog switches to true, and every operator should
> > > buffer
> > > > >>>> its
> > > > >>>>>>>> output in the TM local disk (or remote shuffle service).
> > > > >>> Therefore,
> > > > >>>>>>> after a
> > > > >>>>>>>> task fails, it can restart from the last checkpoint and
> > > > >>> re-consume
> > > > >>>>> data
> > > > >>>>>>>> buffered in the upstream task.
> > > > >>>>>>>>
> > > > >>>>>>>> I will update FLIP as described above. Would this address
> your
> > > > >>>>> concern?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> could see a power setting like:
> > > > >>>>>>>>>         `execution.backlog.use-full-batch-mode-on-start
> > > > >>> (default
> > > > >>>>>>> false)`
> > > > >>>>>>>> I am not sure I fully understand this config or its
> > motivation.
> > > > >>> Can
> > > > >>>>> you
> > > > >>>>>>>> help explain the exact semantics of this config?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> that would override any heuristic of switching to backlog
> if
> > > > >>>>> someone
> > > > >>>>>> is
> > > > >>>>>>>>> submitting a new job that starts with
> > > > >>>>>>>>> `isBacklog=true`.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Or we could limit the scope of this FLIP to only support
> > > > >>> starting
> > > > >>>>>> with
> > > > >>>>>>>>> batch mode and switching only once to
> > > > >>>>>>>>> streaming, and design a follow up with switching back and
> > > > >>> forth?
> > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP
> > into
> > > > >>>> two
> > > > >>>>>>> FLIPs
> > > > >>>>>>>> so that we can make incremental progress.
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>> Dong
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>> I'm looking forwards to hearing/reading out your thoughts.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Best,
> > > > >>>>>>>>> Piotrek
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge
> <j...@ververica.com.invalid
> > >
> > > > >>>>>>>> napisał(a):
> > > > >>>>>>>>>> Hi Dong,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks for your reply!
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Best regards,
> > > > >>>>>>>>>> Jing
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <
> > > > >>> lindon...@gmail.com>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>>> Hi Jing,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for the comments. Please see my reply inline.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
> > > > >>>>>> <j...@ververica.com.invalid
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Hi Dong,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks for the clarification. Now it is clear for me. I
> > > > >>> got
> > > > >>>>>>>>> additional
> > > > >>>>>>>>>>> noob
> > > > >>>>>>>>>>>> questions wrt the internal sorter.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 1. when to call setter to set the
> internalSorterSupported
> > > > >>>> to
> > > > >>>>> be
> > > > >>>>>>>> true?
> > > > >>>>>>>>>>> Developer of the operator class (i.e. those classes which
> > > > >>>>>>> implements
> > > > >>>>>>>>>>> `StreamOperator`) should override the
> > > > >>>>> `#getOperatorAttributes()`
> > > > >>>>>>> API
> > > > >>>>>>>> to
> > > > >>>>>>>>>> set
> > > > >>>>>>>>>>> internalSorterSupported to true, if he/she decides to
> sort
> > > > >>>>>> records
> > > > >>>>>>>>>>> internally in the operator.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> 2
> > > > >>>>>>>>>>>> *"For those operators whose throughput can be
> > > > >>> considerably
> > > > >>>>>>> improved
> > > > >>>>>>>>>> with
> > > > >>>>>>>>>>> an
> > > > >>>>>>>>>>>> internal sorter, update it to take advantage of the
> > > > >>>> internal
> > > > >>>>>>> sorter
> > > > >>>>>>>>>> when
> > > > >>>>>>>>>>>> its input has isBacklog=true.*
> > > > >>>>>>>>>>>> *Typically, operators that involve aggregation operation
> > > > >>>>> (e.g.
> > > > >>>>>>>> join,
> > > > >>>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from
> > > > >>> using
> > > > >>>> an
> > > > >>>>>>>>> internal
> > > > >>>>>>>>>>>> sorter."*
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> *"The operator that performs CoGroup operation will
> > > > >>>>> instantiate
> > > > >>>>>>> two
> > > > >>>>>>>>>>>> internal sorter to sorts records from its two inputs
> > > > >>>>>> separately.
> > > > >>>>>>>> Then
> > > > >>>>>>>>>> it
> > > > >>>>>>>>>>>> can pull the sorted records from these two sorters. This
> > > > >>>> can
> > > > >>>>> be
> > > > >>>>>>>> done
> > > > >>>>>>>>>>>> without wrapping input records with TaggedUnion<...>. In
> > > > >>>>>>>> comparison,
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>> existing DataStream#coGroup needs to wrap input records
> > > > >>>> with
> > > > >>>>>>>>>>>> TaggedUnion<...> before sorting them using one external
> > > > >>>>> sorter,
> > > > >>>>>>>> which
> > > > >>>>>>>>>>>> introduces higher overhead."*
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> According to the performance test, it seems that
> internal
> > > > >>>>>> sorter
> > > > >>>>>>>> has
> > > > >>>>>>>>>>> better
> > > > >>>>>>>>>>>> performance than external sorter. Is it possible to make
> > > > >>>>> those
> > > > >>>>>>>>>> operators
> > > > >>>>>>>>>>>> that can benefit from it use internal sorter by default?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> Yes, it is possible. After this FLIP is done, users can
> use
> > > > >>>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the window
> > > > >>>>> assigner
> > > > >>>>>>> to
> > > > >>>>>>>>>>> co-group two streams in effectively the batch manner. An
> > > > >>>>> operator
> > > > >>>>>>>> that
> > > > >>>>>>>>>> uses
> > > > >>>>>>>>>>> an internal sorter will be used to perform the co-group
> > > > >>>>>> operation.
> > > > >>>>>>>>> There
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>> no need for users of the DataStream API to explicitly
> know
> > > > >>> or
> > > > >>>>> set
> > > > >>>>>>> the
> > > > >>>>>>>>>>> internal sorter in anyway.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> In the future, we plan to incrementally optimize other
> > > > >>>>>> aggregation
> > > > >>>>>>>>>>> operation (e.g. aggregate) on the DataStream API when
> > > > >>>>>>>>> EndOfStreamWindows
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>> used as the window assigner.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> Dong
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Best regards,
> > > > >>>>>>>>>>>> Jing
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
> > > > >>>>> lindon...@gmail.com>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>>>> Hi Jing,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thank you for the comments! Please see my reply inline.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
> > > > >>>>>>>> <j...@ververica.com.invalid
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi Dong,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in good
> > > > >>>>>> shape. I
> > > > >>>>>>>> got
> > > > >>>>>>>>>>> some
> > > > >>>>>>>>>>>>> NIT
> > > > >>>>>>>>>>>>>> questions.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 1. It is a little bit weird to write the hint right
> > > > >>>> after
> > > > >>>>>> the
> > > > >>>>>>>>>>>> motivation
> > > > >>>>>>>>>>>>>> that some features have been moved to FLIP-331,
> > > > >>> because
> > > > >>>>> at
> > > > >>>>>>> that
> > > > >>>>>>>>>> time,
> > > > >>>>>>>>>>>>>> readers don't know the context about what features
> > > > >>> does
> > > > >>>>> it
> > > > >>>>>>>> mean.
> > > > >>>>>>>>> I
> > > > >>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>> suggest moving the note to the beginning of "Public
> > > > >>>>>>> interfaces"
> > > > >>>>>>>>>>>> sections.
> > > > >>>>>>>>>>>>> Given that the reviewer who commented on this email
> > > > >>>> thread
> > > > >>>>>>>> before I
> > > > >>>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331, I
> > > > >>>> think
> > > > >>>>> it
> > > > >>>>>>> is
> > > > >>>>>>>>>>> simpler
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>> just remove any mention of FLIP-331. I have updated the
> > > > >>>>> FLIP
> > > > >>>>>>>>>>> accordingly.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 2. It is also a little bit weird to describe all
> > > > >>>>> behaviour
> > > > >>>>>>>>> changes
> > > > >>>>>>>>>> at
> > > > >>>>>>>>>>>>> first
> > > > >>>>>>>>>>>>>> but only focus on one single feature, i.e. how to
> > > > >>>>> implement
> > > > >>>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I was
> > > > >>>>>> reading
> > > > >>>>>>>> the
> > > > >>>>>>>>>>> Public
> > > > >>>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another
> > > > >>> option
> > > > >>>>>> could
> > > > >>>>>>>> be
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>>>> write a
> > > > >>>>>>>>>>>>>> short summary of all features and point out that this
> > > > >>>>> FLIP
> > > > >>>>>>> will
> > > > >>>>>>>>>> only
> > > > >>>>>>>>>>>>> focus
> > > > >>>>>>>>>>>>>> on the internalSorterSupported feature. Others could
> > > > >>> be
> > > > >>>>>> found
> > > > >>>>>>>> in
> > > > >>>>>>>>>>>>> FLIP-331.
> > > > >>>>>>>>>>>>>> WDYT?
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow a
> > > > >>>> stream
> > > > >>>>>>> mode
> > > > >>>>>>>>> job
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>> run
> > > > >>>>>>>>>>>>> parts of the topology in batch mode so that it can
> > > > >>> apply
> > > > >>>>>>>>>>>>> optimizations/computations that can not be used
> > > > >>> together
> > > > >>>>> with
> > > > >>>>>>>>>>>> checkpointing
> > > > >>>>>>>>>>>>> (and thus not usable in stream mode). Although internal
> > > > >>>>>> sorter
> > > > >>>>>>> is
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> only
> > > > >>>>>>>>>>>>> optimization immediately supported in this FLIP, this
> > > > >>>> FLIP
> > > > >>>>>> lays
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> foundation to support other optimizations in the
> > > > >>> future,
> > > > >>>>> such
> > > > >>>>>>> as
> > > > >>>>>>>>>> using
> > > > >>>>>>>>>>>> GPU
> > > > >>>>>>>>>>>>> to process a bounded stream of records.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Therefore, I find it better to keep the current title
> > > > >>>>> rather
> > > > >>>>>>> than
> > > > >>>>>>>>>>>> limiting
> > > > >>>>>>>>>>>>> the scope to internal sorter. What do you think?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and
> > > > >>> failover
> > > > >>>>>>>> strategy
> > > > >>>>>>>>> ->
> > > > >>>>>>>>>>>> Mixed
> > > > >>>>>>>>>>>>>> mode ->
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>    - If any task fails when isBacklog=false true,
> > > > >>> this
> > > > >>>>> task
> > > > >>>>>>> is
> > > > >>>>>>>>>>>> restarted
> > > > >>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>    re-process its input from the beginning.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thank you for catching this issue. It is fixed now.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>> Dong
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Best regards
> > > > >>>>>>>>>>>>>> Jing
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
> > > > >>>>>> lindon...@gmail.com
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>> Hi Piotr,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks for your comments! Please see my reply
> > > > >>> inline.
> > > > >>>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > > >>>>>>>>>>>>> piotr.nowoj...@gmail.com
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi Dong,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I have a couple of questions.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Could you explain why those properties
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>     @Nullable private Boolean isOutputOnEOF =
> > > > >>> null;
> > > > >>>>>>>>>>>>>>>>     @Nullable private Boolean
> > > > >>> isOutputOnCheckpoint
> > > > >>>> =
> > > > >>>>>>> null;
> > > > >>>>>>>>>>>>>>>>     @Nullable private Boolean
> > > > >>>>>> isInternalSorterSupported =
> > > > >>>>>>>>> null;
> > > > >>>>>>>>>>>>>>>> must be `@Nullable`, instead of having the
> > > > >>> default
> > > > >>>>>> value
> > > > >>>>>>>> set
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>>>>> `false`?
> > > > >>>>>>>>>>>>>>> By initializing these private variables in
> > > > >>>>>>>>>>> OperatorAttributesBuilder
> > > > >>>>>>>>>>>> as
> > > > >>>>>>>>>>>>>>> null, we can implement
> > > > >>>>>> `OperatorAttributesBuilder#build()`
> > > > >>>>>>> in
> > > > >>>>>>>>>> such
> > > > >>>>>>>>>>> a
> > > > >>>>>>>>>>>>> way
> > > > >>>>>>>>>>>>>>> that it can print DEBUG level logging to say
> > > > >>>>>>>>>> "isOutputOnCheckpoint
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>> explicitly set". This can help user/SRE debug
> > > > >>>>> performance
> > > > >>>>>>>>> issues
> > > > >>>>>>>>>>> (or
> > > > >>>>>>>>>>>>> lack
> > > > >>>>>>>>>>>>>>> of the expected optimization) due to operators not
> > > > >>>>>>> explicitly
> > > > >>>>>>>>>>> setting
> > > > >>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> right operator attribute.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> For example, we might want a job to always use the
> > > > >>>>> longer
> > > > >>>>>>>>>>>> checkpointing
> > > > >>>>>>>>>>>>>>> interval (i.e.
> > > > >>>>>>>> execution.checkpointing.interval-during-backlog)
> > > > >>>>>>>>>> if
> > > > >>>>>>>>>>>> all
> > > > >>>>>>>>>>>>>>> running operators have isOutputOnCheckpoint==false,
> > > > >>>> and
> > > > >>>>>> use
> > > > >>>>>>>> the
> > > > >>>>>>>>>>> short
> > > > >>>>>>>>>>>>>>> checkpointing interval otherwise. If a user has
> > > > >>>>>> explicitly
> > > > >>>>>>>>>>> configured
> > > > >>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog but
> > > > >>>> the
> > > > >>>>>>>>> two-phase
> > > > >>>>>>>>>>>>> commit
> > > > >>>>>>>>>>>>>>> sink library has not been upgraded to set
> > > > >>>>>>>>>>> isOutputOnCheckpoint=true,
> > > > >>>>>>>>>>>>> then
> > > > >>>>>>>>>>>>>>> the job will end up using the long checkpointing
> > > > >>>>>> interval,
> > > > >>>>>>>> and
> > > > >>>>>>>>> it
> > > > >>>>>>>>>>>> will
> > > > >>>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>> useful to figure out what is going wrong in this
> > > > >>> case
> > > > >>>>> by
> > > > >>>>>>>>> checking
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> log.
> > > > >>>>>>>>>>>>>>> Note that the default value of these fields of the
> > > > >>>>>>>>>>> OperatorAttributes
> > > > >>>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will
> > > > >>>> still
> > > > >>>>> be
> > > > >>>>>>>>> false.
> > > > >>>>>>>>>>> The
> > > > >>>>>>>>>>>>>>> following is mentioned in the Java doc of
> > > > >>>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> /**
> > > > >>>>>>>>>>>>>>>   * If any operator attribute is null, we will log
> > > > >>> it
> > > > >>>>> at
> > > > >>>>>>>> DEBUG
> > > > >>>>>>>>>>> level
> > > > >>>>>>
>

Reply via email to