Hi Piotr,

Thanks for your comments. Please see my reply inline.


On Thu, Jul 6, 2023 at 1:54 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi,
>
> Thanks for this proposal, this is a very much needed thing that should be
> addressed in Flink.
>
> I think there is one thing that hasn't been discussed neither here nor in
> FLIP-309. Given that we have
> three dimensions:
> - e2e latency/checkpointing interval
> - enabling some kind of batching/buffering on the operator level
> - how much resources we want to allocate to the job
>
> How do we want Flink to adjust itself between those three? For example:
> a) Should we assume that given Job has a fixed amount of assigned resources
> and make it paramount that
>   Flink doesn't exceed those available resources? So in case of
> backpressure, we
>   should extend checkpointing intervals, emit records less frequently and
> in batches.


Yes, this FLIP works under the assumption (also the current behavior of
Flink in stream mode) that the given job has a fixed amount of the assigned
resources.

When any source reader has IsProcessingBacklog=false, it can
invoke Output#emitRecordAttributes to
emit RecordAttributes(isBacklog=true). This event tells the downstream
operators that they can optionally buffer the records received after this
RecordAttributes event until isBacklog=false. With the correponding support
in the operator implementation, the job can achieve the goal of emitting
records less frequently and in batches.

And yes, we can also extend the checkpointing interval when any source
reader is processing a backlog. By using the APIs proposed in FLIP-309,
source reader can tell its isProcessingBacklog status to the checkpoing
coordinator, which can use the longer checkpointing interval (i.e.
execution.checkpointing.interval-during-backlog) as specified by the user.

Currently, FLIP-309 only supports adjusting the IsProcessingBacklog when
the source reader explicitly invokes an API (e.g. setProcessingBacklog). In
the future, we can add plugin to support adjusting the IsProcessingBacklog
status based on e.g. backpressure metrics or any other reasonable source of
information in TM/JM, so that the checkpoint interval and the operator
buffer time can be adjusted accordingly (e.g. when backpressure is high).



> b) Or should we assume that the amount of resources is flexible (up to a
> point?), and the desired e2e latency
>   is the paramount aspect? So in case of backpressure, we should still
> adhere to the configured e2e latency,
>   and wait for the user or autoscaler to scale up the job?
>

I would say it will be an awesome feature to let Flink automatically adjust
its resource usage to meet the user's throughput/latency requirement with
minimal resources.

The following challenges would need to be addressed in order to have this
feature:
- Find the best way to allocate resources (e.g. CPU, memory, parallelism)
across operators.
- Predict the impact on the throughput/latency as we adjust the resource
usages, the buffering time, checkpointing interval etc.
- Balance between the overhead of adjusting/restarting operators and the
benefits of making resource adjustments.

I don't have a good idea on how to address the above issues nicely.
Supposing flexible resource usage is currently out of the scope of this
FLIP.


> In case of a), I think the concept of "isProcessingBacklog" is not needed,
> we could steer the behaviour only
> using the backpressure information.
>

I guess the point is that the downstream operators (and also the checkpoint
coordinator) need a boolean value telling them whether they can start to
buffer records (and use the long checkpointing interval). We can also
rename this value as isUnderBackPressure, or any other name that we find
reasonable. I don't have a strong preference between the choices of this
name.

I think what matters is how to derive the boolean value of
IsProcessingBacklog (I will use this name for now). It seems that there are
a couple of valuable ways to derive this value, and no single one of them
can cover all scenarios with the best performance:

- Allow source reader to determine the IsProcessingBacklog based on the
watermark lag (only when event-time is available).
- Allow source reader to determine the IsProcessingBacklog based on the
backpressure (with the potential downside of introducing backlog when there
is a short interval of traffic spikes on just one reader)
- Allow source reader to determine the IsProcessingBacklog based on its
internal state/event (e.g. MySQL CDC switches from the snapshot phase to
the binlog phase).


> On the other hand, in case of b), "isProcessingBacklog" information might
> be helpful, to let Flink know that
> we can safely decrease the e2e latency/checkpoint interval even if there is
> no backpressure, to use fewer
> resources (and let the autoscaler scale down the job).
>
> Do we want to have both, or only one of those? Do a) and b) complement one
> another? If job is backpressured,
> we should follow a) and expose to autoscaler/users information "Hey! I'm
> barely keeping up! I need more resources!".
> While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> usage.
>

The purpose of this FLIP is to support a). It will be really nice to
support b) as well. And IMO, a) and b) are mostly orthogonal features (in
terms of the amount of development work) and can complement each other.


>
> And a couple of more concrete remarks about the current proposal.
>
> 1.
>
> > I think the goal is to allow users to specify an end-to-end latency
> budget for the job.
>
> I fully agree with this, but in that case, why are you proposing to add
> `execution.flush.interval`? That's
> yet another parameter that would affect e2e latency, without actually
> defining it. We already have things
> like: execution.checkpointing.interval, execution.buffer-timeout. I'm
> pretty sure very few Flink users would be
> able to configure or understand all of them.
>

My bad. The change around execution.checkpointing.interval was published by
mistake.

When I was replying to Martijn's comment, I thought about using
execution.flush.interval at some point, but rejected it later. In my
previous reply to Martijn's email, I said "execution.flush.interval is
rejected because we might flush at higher frequency to deal with ...".



>
> I think we should simplify configuration and try to define
> "execution.end-to-end-latency" so the runtime
> could derive other things from this new configuration.
>

Cool, I have updated the FLIP to use execution.end-to-end-latency.


>
> 2. How do you envision `#flush()` and `#snapshotState()` to be connected?
> So far, `#snapshotState()`
> was considered as a kind of `#flush()` signal. Do we need both? Shouldn't
> `#flush()` be implicitly or
> explicitly attached to the `#snapshotState()` call?
>

IMO, we need both flush() and snapshotState() because there are use-cases
where snapshotState() alone is not sufficient.

For example, suppose we have a job that uses `KeydStream#aggregate()` and
the job does not have any two-phase commit sink. Users might need the
end-to-end latency to be 10 sec. Without flush(), we would have to trigger
`#snapshotState()` every 10 seconds, which is likely much more expensive
than invoking `#flush()` every 10 sec.

I guess you are asking whether the Flink runtime infra should always invoke
`#flush()` before invoking `#snapshotState()`? I would prefer not to
enforce it because the semantics of `#snapshotState()` itself does not
conceptually involve `#flush()`. But I also don't think there will be any
correctness or performance issues if we do this, and would be open to
enforcing this if you prefer it and if we do not discover any other issue
of doing this.


> 3. What about unaligned checkpoints if we have separate `#flush()`
> event/signal?
>

IMO, FlushEvent will always be aligned with the data records regardless of
whether the unaligned checkpoint is enabled.

This is because when the job is under backpressure, the end-to-end latency
will be negatively affected anyway, and it won't help end-to-end latency
even if we let the FlushEvent surpass the records queued before it in the
input channel. So it seems simpler to always handle FlushEvent() in an
aligned manner.


>
> 4. How should this be working in at-least-once mode (especially sources
> that are configured to be working
> in at-least-once mode)?.
>

When the job is running in the exactly-once mode, each operator will wait
for FlushEvents (with the same ID) from all its inputs before invoking the
`#flush()`.

When the job is running in the at-least-once mode, each operator will
invoke `#flush()` once it receives a FlushEvent from any of its inputs, at
long as the ID of this event is larger than the largest ID of the
*FlushEvents it has received earlier.


>
> 5. How is this FLIP connected with FLI-327? I think they are trying to
> achieve basically the same thing:
> optimise when data should be flushed/committed to balance between
> throughput and latency.
>

FLIP-327 builds on top of the APIs introduced in FLIP-325 (e.g.
RecordAttributes) and can further enhance Flink's capability for
stream-batch unified use-cases.

More specifically, FLIP-327 allows us to take advantage of those batch-like
optimizations (e.g. buffering + sorting + BatchExecutionKeyedStateBackend)
which is incompatible with checkpoint (e.g. ExternalSorter currently does
not support checkpoint). It can deliver much higher throughput for some
operations (e.g. cogroup) than we can do with buffering alone.

In comparison, FLIP-325 still requires the operator to support
checkpointing even when execution.end-to-end-latency is configured.

The goal of FLIP-327 requires us to update the logics of building JobGraph,
such as allowing the output edge of an operator to be blocking in the
stream mode, and add managed memory for an operator that claims it uses an
internal sorter.


> 6.
>
> > Add RecordAttributesBuilder and RecordAttributes that extends
> StreamElement to provide operator with essential
> > information about the records they receive, such as whether the records
> are already stale due to backlog.
>
> Passing along `RecordAttribute` for every `StreamElement` would be an
> extremely inefficient solution.
>

Currently, the FLIP provides the `Output#emitRecordAttributes()` for
operators (e.g. source reader) to emit RecordAttributes. The FLIP leaves
the operator to decide the frequency and value of the emitted
RecordAttributes.

Our plan is to let SourceReader emit RecordAttributes only when its value
(e.g. isBacklog) differs from the value of the RecordAttributes it has
emitted earlier. It should avoid resending RecordAttributes with the same
value, similar to how Flink currently avoids resending
Watermark/WatermarkStatus with the same value.

Would it address your concern?


> If at all, this should be a marker propagated through the JobGraph vie
> Events or sent from JM to TMs via an RPC
> that would mark "backlog processing started/ended". Note that Events might
> be costly, as they need to be
> broadcasted. So with a job having 5 keyBy exchanges and parallelism of
> 1000, the number of events sent is
> ~4 000 000, while the number of RPCs would be only 5000.
>

I think we probably need to propagate the IsBacklog (or IsBackPressure,
whatever name we find reasonable) signal from sources to downstream nodes
similar to how we currently propagate watermarks. The benefit is that we
can effectively align the IsBacklog signal with the data records, and tell
operator the following information:

records received before receiving RecordAttributes(isBacklog=false) can to
be processed/emitted with high processing latency because these records are
already very stale due to backlog; but records received after
RecordAttributes(isBacklog=false) are fresh and need to be processed with
low processing latency.

It would be hard to have this semantics if we let JM send the signal
directly to all the TMs.


>
> In case we want to only check for the backpressure, we don't need any extra
> signal. Operators/subtasks can
> get that information very easily from the TMs runtime.


We would need the extra signal to cover the case where the IsBacklog status
needs to be derived from source state (e.g. MySQL source snapshot/binlog
stage).

When we want to only check for the backpressure, another potential benefit
of propagating isBacklog from source operators to downstream operators is
that we might take advantage of features in FLIP-327, dynamically disable
checkpoint, and apply sorting to optimize the operator throughput. IMHO,
such optimization is probably easier to understand and operate by having
source operators decide the backpressure, than letting each operator decide
the pressure status using local information.

What do you think? I am looking forward to your suggestions!

Best,
Dong


> Best,
> Piotrek
>
> czw., 29 cze 2023 o 17:19 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi Shammon,
> >
> > Thanks for your comments. Please see my reply inline.
> >
> > On Thu, Jun 29, 2023 at 6:01 PM Shammon FY <zjur...@gmail.com> wrote:
> >
> > > Hi Dong and Yunfeng,
> > >
> > > Thanks for bringing up this discussion.
> > >
> > > As described in the FLIP, the differences between `end-to-end latency`
> > and
> > > `table.exec.mini-batch.allow-latency` are: "It allows users to specify
> > the
> > > end-to-end latency, whereas table.exec.mini-batch.allow-latency applies
> > to
> > > each operator. If there are N operators on the path from source to
> sink,
> > > the end-to-end latency could be up to
> > table.exec.mini-batch.allow-latency *
> > > N".
> > >
> > > If I understand correctly, `table.exec.mini-batch.allow-latency` is
> also
> > > applied to the end-to-end latency for a job, maybe @Jack Wu can give
> more
> > > information.
> > >
> >
> > Based on what I can tell from the doc/code and offline discussion, I
> > believe table.exec.mini-batch.allow-latency is not applied to the
> > end-to-end latency for a job.
> >
> > It is mentioned here
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/
> > >
> > that
> > table.exec.mini-batch.allow-latency is "the maximum latency can be used
> for
> > MiniBatch to buffer input records". I think we should have mentioned that
> > the config is applied to the end-to-end latency in this doc if it is
> indeed
> > the case.
> >
> >
> > > So, from my perspective, and please correct me if I'm misunderstand,
> the
> > > targets of this FLIP may include the following:
> > >
> > > 1. Support a mechanism like  `mini-batch` in SQL for `DataStream`,
> which
> > > will collect data in the operator and flush data when it receives a
> > `flush`
> > > event, in the FLIP it is `FlushEvent`.
> > >
> >
> > I think the goal is to allow users to specify an end-to-end latency
> budget
> > for the job. IMO it is quite different from the `mini-batch` in SQL.
> >
> >
> > >
> > > 2. Support dynamic `latency` according to the progress of job, such as
> > > snapshot stage and after that.
> > >
> > > To do that, I have some questions:
> > >
> > > 1. I didn't understand the purpose of public interface
> > `RecordAttributes`.
> > > I think `FlushEvent` in the FLIP is enough, and different
> > > `DynamicFlushStrategy` can be added to generate flush events to address
> > > different needs, such as a static interval similar to mini-batch in SQL
> > or
> > > collect the information `isProcessingBacklog` and metrics to generate
> > > `FlushEvent` which is described in your FLIP? If hudi sink needs the
> > > `isBacklog` flag, the hudi `SplitEnumerator` can create an operator
> event
> > > and send it to hudi source reader.
> > >
> >
> > Suppose we only have FlushEvent, then operators (e.g. Hudi Sink) will not
> > know they can buffer data in the following scenario:
> >
> > - execution.allowed-latency is not configured and use the default value
> > null.
> > - The job is reading from HybridSource and HybridSource says
> > isBacklog=true.
> >
> > Also note that Hudi Sink might not be the only operators that can benefit
> > from knowing isBacklog=true. Other sinks and aggregation operators (e.g.
> > CoGroup) can also increase throughput by buffering/sorting records when
> > there is backlog. So it seems simpler to pass RecordAttributes to these
> > operators than asking every operator developer to create operator event
> > etc.
> >
> >
> > >
> > > 2. How is this new mechanism unified with SQL's mini-batch mechanism?
> As
> > > far as I am concerned, SQL implements mini-batch mechanism based on
> > > watermark, I think it is very unreasonable to have two different
> > > implementation in SQL and DataStream.
> > >
> >
> > I think we can deprecate table.exec.mini-batch.allow-latency later
> > once execution.allowed-latency is ready for production usage. This is
> > mentioned in the "Compatibility, Deprecation, and Migration Plan"
> section.
> >
> > If there is a config that supports user specifying the e2e latency, it is
> > probably reasonable for this config to work for both DataStream and SQL.
> >
> >
> > > 3. I notice that the `CheckpointCoordinator` will generate
> `FlushEvent`,
> > > which information about `FlushEvent` will be stored in
> > >
> >
> > CheckpointCoordinator might need to send FlushEvent before triggering
> > checkpoint in order to deal with the two-phase commit sinks. The
> algorithm
> > is specified in the "Proposed Changes" section.
> >
> >
> > > `Checkpoint`? What is the alignment strategy for FlushEvent in the
> > > operator? The operator will flush the data when it receives all
> > > `FlushEvent` from upstream with the same ID or do flush for each
> > > `FlushEvent`? Can you give more detailed proposal about that? We also
> > have
> > > a demand for this piece, thanks
> > >
> >
> > After an operator has received a FlushEvent:
> > - If the ID of the received FlushEvent is larger than the largest ID this
> > operator has received, then flush() is triggered for this operator and
> the
> > operator should broadcast FlushEvent to downstream operators.
> > - Otherwise, this FlushEvent is ignored.
> >
> > This behavior is specified in the Java doc of the FlushEvent.
> >
> > Can you see if this answers your questions?
> >
> > Best,
> > Dong
> >
> >
> > >
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > >
> > > On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser <
> martijnvis...@apache.org
> > >
> > > wrote:
> > >
> > >> Hi Dong and Yunfeng,
> > >>
> > >> Thanks for the FLIP. What's not clear for me is what's the expected
> > >> behaviour when the allowed latency can't be met, for whatever reason.
> > >> Given that we're talking about an "allowed latency", it implies that
> > >> something has gone wrong and should fail? Isn't this more a minimum
> > >> latency that you're proposing?
> > >>
> > >> There's also the part about the Hudi Sink processing records
> > >> immediately upon arrival. Given that the SinkV2 API provides the
> > >> ability for custom post and pre-commit topologies [1], specifically
> > >> targeted to avoid generating multiple small files, why isn't that
> > >> sufficient for the Hudi Sink? It would be great to see that added
> > >> under Rejected Alternatives if this is indeed not sufficient.
> > >>
> > >> Best regards,
> > >>
> > >> Martijn
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > >>
> > >> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou
> > >> <flink.zhouyunf...@gmail.com> wrote:
> > >> >
> > >> > Hi all,
> > >> >
> > >> > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > >> > support configuring end-to-end allowed latency for Flink jobs, which
> > >> > has been documented in FLIP-325
> > >> > <
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency
> > >> >.
> > >> >
> > >> > By configuring the latency requirement for a Flink job, users would
> be
> > >> > able to optimize the throughput and overhead of the job while still
> > >> > acceptably increasing latency. This approach is particularly useful
> > >> > when dealing with records that do not require immediate processing
> and
> > >> > emission upon arrival.
> > >> >
> > >> > Please refer to the FLIP document for more details about the
> proposed
> > >> > design and implementation. We welcome any feedback and opinions on
> > >> > this proposal.
> > >> >
> > >> > Best regards.
> > >> >
> > >> > Dong and Yunfeng
> > >>
> > >
> >
>

Reply via email to