Thanks

Best,

Xintong



On Wed, Jan 10, 2024 at 5:56 PM Xuannan Su <suxuanna...@gmail.com> wrote:

> Hi all,
>
> After several rounds of offline discussions with Xingtong and Jinhao,
> we have decided to narrow the scope of the FLIP. It will now focus on
> introducing OperatorAttributes that indicate whether an operator emits
> records only after inputs have ended. We will also use the attribute
> to optimize task scheduling for better resource utilization. Setting
> the backlog status and optimizing the operator implementation during
> the backlog will be deferred to future work.
>
> In addition to the change above, we also make the following changes to
> the FLIP to address the problems mentioned by Dong:
> - Public interfaces are updated to reuse the GlobalWindows.
> - Instead of making all outputs of the upstream operators of the
> "isOutputOnlyAfterEndOfStream=true" operator blocking, we only make
> the output of the operator with "isOutputOnlyAfterEndOfStream=true"
> blocking. This can prevent the second problem Dong mentioned. In the
> future, we may introduce an extra OperatorAttributes to indicate if an
> operator has any side output.
>
> I would greatly appreciate any comment or feedback you may have on the
> updated FLIP.
>
> Best regards,
> Xuannan
>
> On Tue, Sep 26, 2023 at 11:24 AM Dong Lin <lindon...@gmail.com> wrote:
> >
> > Hi all,
> >
> > Thanks for the review!
> >
> > Becket and I discussed this FLIP offline and we agreed on several things
> > that need to be improved with this FLIP. I will summarize our discussion
> > with the problems and TODOs. We will update the FLIP and let you know
> once
> > the FLIP is ready for review again.
> >
> > 1) Investigate whether it is possible to update the existing
> GlobalWindows
> > in a backward-compatible way and re-use it for the same purpose
> > as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
> > class.
> >
> > Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger instance
> > which will not trigger window's computation even on end-of-inputs. We
> will
> > need to investigate its existing usage and see if we can re-use it in a
> > backward-compatible way.
> >
> > 2) Let JM know whether any operator in the upstream of the operator with
> > "isOutputOnEOF=true" will emit output via any side channel. The FLIP
> should
> > update the execution mode of those operators *only if* all outputs from
> > those operators are emitted only at the end of input.
> >
> > More specifically, the upstream operator might involve a user-defined
> > operator that might emit output directly to an external service, where
> the
> > emission operation is not explicitly expressed as an operator's output
> edge
> > and thus not visible to JM. Similarly, it is also possible for the
> > user-defined operator to register a timer
> > via InternalTimerService#registerEventTimeTimer and emit output to an
> > external service inside Triggerable#onEventTime. There is a chance that
> > users still need related logic to output data in real-time, even if the
> > downstream operators have isOutputOnEOF=true.
> >
> > One possible solution to address this problem is to add an extra
> > OperatorAttribute to specify whether this operator might output records
> in
> > such a way that does not go through operator's output (e.g. side output).
> > Then the JM can safely enable the runtime optimization currently
> described
> > in the FLIP when there is no such operator.
> >
> > 3) Create a follow-up FLIP that allows users to specify whether a source
> > with Boundedness=bounded should have isProcessingBacklog=true.
> >
> > This capability would effectively introduce a 3rd strategy to set backlog
> > status (in addition to FLIP-309 and FLIP-328). It might be useful to note
> > that, even though the data in bounded sources are backlog data in most
> > practical use-cases, it is not necessarily true. For example, users might
> > want to start a Flink job to consume real-time data from a Kafka topic
> and
> > specify that the job stops after 24 hours, which means the source is
> > technically bounded while the data is fresh/real-time.
> >
> > This capability is more generic and can cover more use-case than
> > EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be
> > useful in cases where users already need to specify this window assigner
> in
> > a DataStream program, without bothering users to decide whether it is
> safe
> > to treat data in a bounded source as backlog data.
> >
> >
> > Regards,
> > Dong
> >
> >
> >
> >
> >
> >
> > On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan <tanyuxinw...@gmail.com>
> wrote:
> >
> > > Hi, Dong,
> > > Thanks for your efforts.
> > >
> > > +1 to this proposal,
> > > I believe this will improve the performance in some mixture
> circumstances
> > > of bounded and unbounded workloads.
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道:
> > >
> > > > Thanks for addressing my comments, Dong.
> > > >
> > > > LGTM.
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com>
> > > wrote:
> > > >
> > > > > Hi Dong & Jinhao,
> > > > >
> > > > > Thanks for your clarification! +1
> > > > >
> > > > > Best regards,
> > > > > Wencong
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote:
> > > > > >Hi Wencong,
> > > > > >
> > > > > >Thanks for your comments! Please see my reply inline.
> > > > > >
> > > > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <
> liuwencle...@163.com>
> > > > > wrote:
> > > > > >
> > > > > >> Dear Dong,
> > > > > >>
> > > > > >> I have thoroughly reviewed the proposal for FLIP-331 and
> believe it
> > > > > would
> > > > > >> be
> > > > > >> a valuable addition to Flink. However, I do have a few questions
> > > that
> > > > I
> > > > > >> would
> > > > > >> like to discuss:
> > > > > >>
> > > > > >>
> > > > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is
> implemented
> > > by
> > > > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which
> naturally
> > > > > >> supports WindowedStream and AllWindowedStream to process all
> records
> > > > > >> belonging to a key in a 'global' window under both STREAMING and
> > > BATCH
> > > > > >> runtime execution mode.
> > > > > >>
> > > > > >>
> > > > > >> However, besides coGroup and keyBy().aggregate(), other
> operators on
> > > > > >> WindowedStream and AllWindowedStream, such as join/reduce, etc,
> > > > > currently
> > > > > >> are still implemented based on WindowOperator.
> > > > > >>
> > > > > >>
> > > > > >> In fact, these operators can also be implemented without using
> > > > > >> WindowOperator
> > > > > >> to prevent additional WindowAssigner#assignWindows or
> > > > > >> triggerContext#onElement
> > > > > >> invocation cost. Will there be plans to support these operators
> in
> > > the
> > > > > >> future?
> > > > > >>
> > > > > >
> > > > > >You are right. The EndOfStreamWindows proposed in this FLIP can
> > > > > potentially
> > > > > >benefit any DataStream API that takes WindowAssigner as
> parameters.
> > > This
> > > > > >can involve more operations than aggregate and co-group.
> > > > > >
> > > > > >And yes, we have plans to take advantage of this API to optimize
> these
> > > > > >operators in the future. This FLIP focuses on the introduction of
> the
> > > > > >public APIs and uses aggregate/co-group as the first two examples
> to
> > > > > >show-case the performance benefits.
> > > > > >
> > > > > >I have added a "Analysis of APIs affected by this FLIP" to list
> the
> > > > > >DataStream APIs that can benefit from this FLIP. Would this answer
> > > your
> > > > > >question?
> > > > > >
> > > > > >
> > > > > >>
> > > > > >> 2. When using EndOfStreamWindows, upstream operators no longer
> > > support
> > > > > >> checkpointing. This limit may be too strict, especially when
> dealing
> > > > > with
> > > > > >> bounded data in streaming runtime execution mode, where
> > > checkpointing
> > > > > >> can still be useful.
> > > > > >>
> > > > > >
> > > > > >I am not sure we have a good way to support checkpoint while still
> > > > > >achieving the performance improves targeted by this FLIP.
> > > > > >
> > > > > >The issue here is that if we support checkpoint, then we can not
> take
> > > > > >advantage of algorithms (e.g. sorting inputs using ExternalSorter)
> > > that
> > > > > are
> > > > > >not compatible with checkpoints. These algorithms (which do not
> > > support
> > > > > >checkpoint) are the main reasons why batch mode currently
> > > significantly
> > > > > >outperforms stream mode in doing aggregation/cogroup etc.
> > > > > >
> > > > > >In most cases where the user does not care about processing
> latency,
> > > it
> > > > is
> > > > > >generally preferred to use batch mode to perform aggregation
> > > operations
> > > > > >(which should be 10X faster than the existing stream mode
> performance)
> > > > > >instead of doing checkpoint.
> > > > > >
> > > > > >Also note that we can still let operators perform failover in the
> same
> > > > as
> > > > > >the existing batch mode execution, where the intermediate results
> > > > > (produced
> > > > > >by one operator) can be persisted in shuffle service and
> downstream
> > > > > >operators can re-read those data from shuffle service after
> failover.
> > > > > >
> > > > > >
> > > > > >>
> > > > > >> 3. The proposal mentions that if a transformation has
> isOutputOnEOF
> > > ==
> > > > > >> true, the
> > > > > >> operator as well as its upstream operators will be executed in
> > > 'batch
> > > > > >> mode' with
> > > > > >> checkpointing disabled. I would like to understand the specific
> > > > > >> implications of this
> > > > > >> 'batch mode' and if there are any other changes associated with
> it?
> > > > > >
> > > > > >
> > > > > >Good point. We should explicitly mention the changes. I have
> updated
> > > the
> > > > > >FLIP to clarify this.
> > > > > >
> > > > > >More specifically, the checkpoint is disabled when these
> operators are
> > > > > >running, such that these operators can do operations not
> compatible
> > > with
> > > > > >checkpoints (e.g. sorting inputs). And operators should re-read
> the
> > > data
> > > > > >from the upstream blocking edge or sources after failover.
> > > > > >
> > > > > >Would this answer your question?
> > > > > >
> > > > > >
> > > > > >>
> > > > > >> Additionally, I am curious to know if this 'batch mode'
> conflicts
> > > with
> > > > > the
> > > > > >> 'mix mode'
> > > > > >>
> > > > > >> described in FLIP-327. While the coGroup and keyBy().aggregate()
> > > > > operators
> > > > > >> on
> > > > > >> EndOfStreamWindows have the attribute
> 'isInternalSorterSupported'
> > > set
> > > > to
> > > > > >> true,
> > > > > >> indicating support for the 'mixed mode', they also have
> > > isOutputOnEOF
> > > > > set
> > > > > >> to true,
> > > > > >> which suggests that the upstream operators should be executed in
> > > > 'batch
> > > > > >> mode'.
> > > > > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would
> > > > > appreciate
> > > > > >> any
> > > > > >> clarification on this matter.
> > > > > >>
> > > > > >
> > > > > >Good question. I think `isInternalSorterSupported` and
> `isOutputOnEOF`
> > > > do
> > > > > >not conflict with each other.
> > > > > >
> > > > > >It might be useful to recap the semantics of these attributes:
> > > > > >- `isOutputOnEOF` describes whether an operator outputs data only
> > > after
> > > > > all
> > > > > >its input has been ingested by the operator.
> > > > > >-  `isInternalSorterSupported` describes whether an operator will
> use
> > > an
> > > > > >internal sorter when it does not need to do checkpoints.
> > > > > >
> > > > > >And we can further derive that these semantics of two attributes
> do
> > > not
> > > > > >conflict with each other. And we can have valid operators with
> any of
> > > > the
> > > > > >four combinations of true/false values for these two attributes.
> > > > > >
> > > > > >In the specific example you described above, let's say
> isOutputOnEOF =
> > > > > true
> > > > > >and isInternalSorterSupported = true. According to FLIP-331, the
> > > > > checkpoint
> > > > > >is disabled when this operator is running. And according to
> FLIP-327,
> > > > this
> > > > > >operator will sort data internally, which means that Flink runtime
> > > > should
> > > > > >not additionally sort its inputs. So overall the Flink job can
> comply
> > > > with
> > > > > >the semantics of these two attributes consistently.
> > > > > >
> > > > > >
> > > > > >Thanks again for taking time to review this FLIP. Please let me
> know
> > > > what
> > > > > >you think.
> > > > > >
> > > > > >Best regards,
> > > > > >Dong
> > > > > >
> > > > > >
> > > > > >> Thank you for taking the time to consider my feedback. I eagerly
> > > await
> > > > > >> your response.
> > > > > >>
> > > > > >> Best regards,
> > > > > >>
> > > > > >> Wencong Liu
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
> > > > > >> >Hi all,
> > > > > >> >
> > > > > >> >Jinhao (cc'ed) and I are opening this thread to discuss
> FLIP-331:
> > > > > Support
> > > > > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to
> optimize
> > > > > task
> > > > > >> >deployment. The design doc can be found at
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> > > > > >> >.
> > > > > >> >
> > > > > >> >This FLIP introduces isOutputOnEOF operator attribute that
> > > JobManager
> > > > > can
> > > > > >> >use to optimize task deployment and resource utilization. In
> > > > addition,
> > > > > it
> > > > > >> >also adds EndOfStreamWindows that can be used with the
> DataStream
> > > > APIs
> > > > > >> >(e.g. cogroup, aggregate) to significantly increase throughput
> and
> > > > > reduce
> > > > > >> >resource utilization.
> > > > > >> >
> > > > > >> >We would greatly appreciate any comment or feedback you may
> have on
> > > > > this
> > > > > >> >proposal.
> > > > > >> >
> > > > > >> >Cheers,
> > > > > >> >Dong
> > > > > >>
> > > > >
> > > >
> > >
>

Reply via email to