Hi all,

Thanks for the review!

Becket and I discussed this FLIP offline and we agreed on several things
that need to be improved with this FLIP. I will summarize our discussion
with the problems and TODOs. We will update the FLIP and let you know once
the FLIP is ready for review again.

1) Investigate whether it is possible to update the existing GlobalWindows
in a backward-compatible way and re-use it for the same purpose
as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
class.

Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance
which will not trigger window's computation even on end-of-inputs. We will
need to investigate its existing usage and see if we can re-use it in a
backward-compatible way.

2) Let JM know whether any operator in the upstream of the operator with
"isOutputOnEOF=true" will emit output via any side channel. The FLIP should
update the execution mode of those operators *only if* all outputs from
those operators are emitted only at the end of input.

More specifically, the upstream operator might involve a user-defined
operator that might emit output directly to an external service, where the
emission operation is not explicitly expressed as an operator's output edge
and thus not visible to JM. Similarly, it is also possible for the
user-defined operator to register a timer
via InternalTimerService#registerEventTimeTimer and emit output to an
external service inside Triggerable#onEventTime. There is a chance that
users still need related logic to output data in real-time, even if the
downstream operators have isOutputOnEOF=true.

One possible solution to address this problem is to add an extra
OperatorAttribute to specify whether this operator might output records in
such a way that does not go through operator's output (e.g. side output).
Then the JM can safely enable the runtime optimization currently described
in the FLIP when there is no such operator.

3) Create a follow-up FLIP that allows users to specify whether a source
with Boundedness=bounded should have isProcessingBacklog=true.

This capability would effectively introduce a 3rd strategy to set backlog
status (in addition to FLIP-309 and FLIP-328). It might be useful to note
that, even though the data in bounded sources are backlog data in most
practical use-cases, it is not necessarily true. For example, users might
want to start a Flink job to consume real-time data from a Kafka topic and
specify that the job stops after 24 hours, which means the source is
technically bounded while the data is fresh/real-time.

This capability is more generic and can cover more use-case than
EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be
useful in cases where users already need to specify this window assigner in
a DataStream program, without bothering users to decide whether it is safe
to treat data in a bounded source as backlog data.


Regards,
Dong






On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan <tanyuxinw...@gmail.com> wrote:

> Hi, Dong,
> Thanks for your efforts.
>
> +1 to this proposal,
> I believe this will improve the performance in some mixture circumstances
> of bounded and unbounded workloads.
>
> Best,
> Yuxin
>
>
> Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道:
>
> > Thanks for addressing my comments, Dong.
> >
> > LGTM.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com>
> wrote:
> >
> > > Hi Dong & Jinhao,
> > >
> > > Thanks for your clarification! +1
> > >
> > > Best regards,
> > > Wencong
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote:
> > > >Hi Wencong,
> > > >
> > > >Thanks for your comments! Please see my reply inline.
> > > >
> > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com>
> > > wrote:
> > > >
> > > >> Dear Dong,
> > > >>
> > > >> I have thoroughly reviewed the proposal for FLIP-331 and believe it
> > > would
> > > >> be
> > > >> a valuable addition to Flink. However, I do have a few questions
> that
> > I
> > > >> would
> > > >> like to discuss:
> > > >>
> > > >>
> > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented
> by
> > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> > > >> supports WindowedStream and AllWindowedStream to process all records
> > > >> belonging to a key in a 'global' window under both STREAMING and
> BATCH
> > > >> runtime execution mode.
> > > >>
> > > >>
> > > >> However, besides coGroup and keyBy().aggregate(), other operators on
> > > >> WindowedStream and AllWindowedStream, such as join/reduce, etc,
> > > currently
> > > >> are still implemented based on WindowOperator.
> > > >>
> > > >>
> > > >> In fact, these operators can also be implemented without using
> > > >> WindowOperator
> > > >> to prevent additional WindowAssigner#assignWindows or
> > > >> triggerContext#onElement
> > > >> invocation cost. Will there be plans to support these operators in
> the
> > > >> future?
> > > >>
> > > >
> > > >You are right. The EndOfStreamWindows proposed in this FLIP can
> > > potentially
> > > >benefit any DataStream API that takes WindowAssigner as parameters.
> This
> > > >can involve more operations than aggregate and co-group.
> > > >
> > > >And yes, we have plans to take advantage of this API to optimize these
> > > >operators in the future. This FLIP focuses on the introduction of the
> > > >public APIs and uses aggregate/co-group as the first two examples to
> > > >show-case the performance benefits.
> > > >
> > > >I have added a "Analysis of APIs affected by this FLIP" to list the
> > > >DataStream APIs that can benefit from this FLIP. Would this answer
> your
> > > >question?
> > > >
> > > >
> > > >>
> > > >> 2. When using EndOfStreamWindows, upstream operators no longer
> support
> > > >> checkpointing. This limit may be too strict, especially when dealing
> > > with
> > > >> bounded data in streaming runtime execution mode, where
> checkpointing
> > > >> can still be useful.
> > > >>
> > > >
> > > >I am not sure we have a good way to support checkpoint while still
> > > >achieving the performance improves targeted by this FLIP.
> > > >
> > > >The issue here is that if we support checkpoint, then we can not take
> > > >advantage of algorithms (e.g. sorting inputs using ExternalSorter)
> that
> > > are
> > > >not compatible with checkpoints. These algorithms (which do not
> support
> > > >checkpoint) are the main reasons why batch mode currently
> significantly
> > > >outperforms stream mode in doing aggregation/cogroup etc.
> > > >
> > > >In most cases where the user does not care about processing latency,
> it
> > is
> > > >generally preferred to use batch mode to perform aggregation
> operations
> > > >(which should be 10X faster than the existing stream mode performance)
> > > >instead of doing checkpoint.
> > > >
> > > >Also note that we can still let operators perform failover in the same
> > as
> > > >the existing batch mode execution, where the intermediate results
> > > (produced
> > > >by one operator) can be persisted in shuffle service and downstream
> > > >operators can re-read those data from shuffle service after failover.
> > > >
> > > >
> > > >>
> > > >> 3. The proposal mentions that if a transformation has isOutputOnEOF
> ==
> > > >> true, the
> > > >> operator as well as its upstream operators will be executed in
> 'batch
> > > >> mode' with
> > > >> checkpointing disabled. I would like to understand the specific
> > > >> implications of this
> > > >> 'batch mode' and if there are any other changes associated with it?
> > > >
> > > >
> > > >Good point. We should explicitly mention the changes. I have updated
> the
> > > >FLIP to clarify this.
> > > >
> > > >More specifically, the checkpoint is disabled when these operators are
> > > >running, such that these operators can do operations not compatible
> with
> > > >checkpoints (e.g. sorting inputs). And operators should re-read the
> data
> > > >from the upstream blocking edge or sources after failover.
> > > >
> > > >Would this answer your question?
> > > >
> > > >
> > > >>
> > > >> Additionally, I am curious to know if this 'batch mode' conflicts
> with
> > > the
> > > >> 'mix mode'
> > > >>
> > > >> described in FLIP-327. While the coGroup and keyBy().aggregate()
> > > operators
> > > >> on
> > > >> EndOfStreamWindows have the attribute 'isInternalSorterSupported'
> set
> > to
> > > >> true,
> > > >> indicating support for the 'mixed mode', they also have
> isOutputOnEOF
> > > set
> > > >> to true,
> > > >> which suggests that the upstream operators should be executed in
> > 'batch
> > > >> mode'.
> > > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would
> > > appreciate
> > > >> any
> > > >> clarification on this matter.
> > > >>
> > > >
> > > >Good question. I think `isInternalSorterSupported` and `isOutputOnEOF`
> > do
> > > >not conflict with each other.
> > > >
> > > >It might be useful to recap the semantics of these attributes:
> > > >- `isOutputOnEOF` describes whether an operator outputs data only
> after
> > > all
> > > >its input has been ingested by the operator.
> > > >-  `isInternalSorterSupported` describes whether an operator will use
> an
> > > >internal sorter when it does not need to do checkpoints.
> > > >
> > > >And we can further derive that these semantics of two attributes do
> not
> > > >conflict with each other. And we can have valid operators with any of
> > the
> > > >four combinations of true/false values for these two attributes.
> > > >
> > > >In the specific example you described above, let's say isOutputOnEOF =
> > > true
> > > >and isInternalSorterSupported = true. According to FLIP-331, the
> > > checkpoint
> > > >is disabled when this operator is running. And according to FLIP-327,
> > this
> > > >operator will sort data internally, which means that Flink runtime
> > should
> > > >not additionally sort its inputs. So overall the Flink job can comply
> > with
> > > >the semantics of these two attributes consistently.
> > > >
> > > >
> > > >Thanks again for taking time to review this FLIP. Please let me know
> > what
> > > >you think.
> > > >
> > > >Best regards,
> > > >Dong
> > > >
> > > >
> > > >> Thank you for taking the time to consider my feedback. I eagerly
> await
> > > >> your response.
> > > >>
> > > >> Best regards,
> > > >>
> > > >> Wencong Liu
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
> > > >> >Hi all,
> > > >> >
> > > >> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331:
> > > Support
> > > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize
> > > task
> > > >> >deployment. The design doc can be found at
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> > > >> >.
> > > >> >
> > > >> >This FLIP introduces isOutputOnEOF operator attribute that
> JobManager
> > > can
> > > >> >use to optimize task deployment and resource utilization. In
> > addition,
> > > it
> > > >> >also adds EndOfStreamWindows that can be used with the DataStream
> > APIs
> > > >> >(e.g. cogroup, aggregate) to significantly increase throughput and
> > > reduce
> > > >> >resource utilization.
> > > >> >
> > > >> >We would greatly appreciate any comment or feedback you may have on
> > > this
> > > >> >proposal.
> > > >> >
> > > >> >Cheers,
> > > >> >Dong
> > > >>
> > >
> >
>

Reply via email to