Hi Wencong,

Thanks for your comments! Please see my reply inline.

On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> wrote:

> Dear Dong,
>
> I have thoroughly reviewed the proposal for FLIP-331 and believe it would
> be
> a valuable addition to Flink. However, I do have a few questions that I
> would
> like to discuss:
>
>
> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> supports WindowedStream and AllWindowedStream to process all records
> belonging to a key in a 'global' window under both STREAMING and BATCH
> runtime execution mode.
>
>
> However, besides coGroup and keyBy().aggregate(), other operators on
> WindowedStream and AllWindowedStream, such as join/reduce, etc, currently
> are still implemented based on WindowOperator.
>
>
> In fact, these operators can also be implemented without using
> WindowOperator
> to prevent additional WindowAssigner#assignWindows or
> triggerContext#onElement
> invocation cost. Will there be plans to support these operators in the
> future?
>

You are right. The EndOfStreamWindows proposed in this FLIP can potentially
benefit any DataStream API that takes WindowAssigner as parameters. This
can involve more operations than aggregate and co-group.

And yes, we have plans to take advantage of this API to optimize these
operators in the future. This FLIP focuses on the introduction of the
public APIs and uses aggregate/co-group as the first two examples to
show-case the performance benefits.

I have added a "Analysis of APIs affected by this FLIP" to list the
DataStream APIs that can benefit from this FLIP. Would this answer your
question?


>
> 2. When using EndOfStreamWindows, upstream operators no longer support
> checkpointing. This limit may be too strict, especially when dealing with
> bounded data in streaming runtime execution mode, where checkpointing
> can still be useful.
>

I am not sure we have a good way to support checkpoint while still
achieving the performance improves targeted by this FLIP.

The issue here is that if we support checkpoint, then we can not take
advantage of algorithms (e.g. sorting inputs using ExternalSorter) that are
not compatible with checkpoints. These algorithms (which do not support
checkpoint) are the main reasons why batch mode currently significantly
outperforms stream mode in doing aggregation/cogroup etc.

In most cases where the user does not care about processing latency, it is
generally preferred to use batch mode to perform aggregation operations
(which should be 10X faster than the existing stream mode performance)
instead of doing checkpoint.

Also note that we can still let operators perform failover in the same as
the existing batch mode execution, where the intermediate results (produced
by one operator) can be persisted in shuffle service and downstream
operators can re-read those data from shuffle service after failover.


>
> 3. The proposal mentions that if a transformation has isOutputOnEOF ==
> true, the
> operator as well as its upstream operators will be executed in 'batch
> mode' with
> checkpointing disabled. I would like to understand the specific
> implications of this
> 'batch mode' and if there are any other changes associated with it?


Good point. We should explicitly mention the changes. I have updated the
FLIP to clarify this.

More specifically, the checkpoint is disabled when these operators are
running, such that these operators can do operations not compatible with
checkpoints (e.g. sorting inputs). And operators should re-read the data
from the upstream blocking edge or sources after failover.

Would this answer your question?


>
> Additionally, I am curious to know if this 'batch mode' conflicts with the
> 'mix mode'
>
> described in FLIP-327. While the coGroup and keyBy().aggregate() operators
> on
> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to
> true,
> indicating support for the 'mixed mode', they also have isOutputOnEOF set
> to true,
> which suggests that the upstream operators should be executed in 'batch
> mode'.
> Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate
> any
> clarification on this matter.
>

Good question. I think `isInternalSorterSupported` and `isOutputOnEOF` do
not conflict with each other.

It might be useful to recap the semantics of these attributes:
- `isOutputOnEOF` describes whether an operator outputs data only after all
its input has been ingested by the operator.
-  `isInternalSorterSupported` describes whether an operator will use an
internal sorter when it does not need to do checkpoints.

And we can further derive that these semantics of two attributes do not
conflict with each other. And we can have valid operators with any of the
four combinations of true/false values for these two attributes.

In the specific example you described above, let's say isOutputOnEOF = true
and isInternalSorterSupported = true. According to FLIP-331, the checkpoint
is disabled when this operator is running. And according to FLIP-327, this
operator will sort data internally, which means that Flink runtime should
not additionally sort its inputs. So overall the Flink job can comply with
the semantics of these two attributes consistently.


Thanks again for taking time to review this FLIP. Please let me know what
you think.

Best regards,
Dong


> Thank you for taking the time to consider my feedback. I eagerly await
> your response.
>
> Best regards,
>
> Wencong Liu
>
>
>
>
>
>
>
>
>
>
>
> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
> >Hi all,
> >
> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support
> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
> >deployment. The design doc can be found at
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> >.
> >
> >This FLIP introduces isOutputOnEOF operator attribute that JobManager can
> >use to optimize task deployment and resource utilization. In addition, it
> >also adds EndOfStreamWindows that can be used with the DataStream APIs
> >(e.g. cogroup, aggregate) to significantly increase throughput and reduce
> >resource utilization.
> >
> >We would greatly appreciate any comment or feedback you may have on this
> >proposal.
> >
> >Cheers,
> >Dong
>

Reply via email to