Hi, Atiozi

Thanks for your feedback.

> Traverse the ExecNode DAG and create a FusionExecNode  for physical
operators that can be fused together.
which kind of operators can be fused together ? are the operators in an
operator chain? Is this optimization aligned to spark's whole stage codegen
?
In theory, all kinds of operators can be fused together, our final goal is
to support all operators in batch mode, OperatorChain is just one case. Due
to this work effort is relatively large, so we need to complete it step by
step. Our OFCG not only achieves the ability of spark's whole stage
codegen, but also do more better than them.

> does the "support codegen" means fusion codegen? but why we generate a
FusionTransformation when the member operator does not support codegen, IMO
it should
fallback to the current behavior.

yes, it means the fusion codegen. In FLIP, I propose two operator fusion
mechanisms, one is like OperatorChain for single input operator, another is
MultipleInput fusion. For the former, our design mechanism is to fuse all
operators together at the ExecNode layer only if they all support fusion
codegen, or else go over the default OperatorChain. For the latter, in
order not to break the existing MultipleInput optimization purpose, so when
there are member operators that do not support fusion codegen,  we will
fall back to the current behavior[1], which means that a
FusionTransformation is created. here FusionTransformation is just a
surrogate for MultipleInput case, it actually means
MultipleInputTransformation, which fuses multiple physical operators.
Sorry, the description in the flow is not very clear and caused your
confusion.

> In the end, I share the same idea with Lincoln about performance
benchmark.
Currently flink community's flink-benchmark only covers like schedule,
state, datastream operator's performance.
A good benchmark harness for sql operator will benefit the sql optimizer
topic and observation

For the performance benchmark, I agree with you. As I stated earlier, I
think this is a new scope of work, we should design it separately, we can
introduce this improvement in the future.

[1]
https://github.com/apache/flink/blob/77214f138cf759a3ee5466c9b2379e717227a0ae/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java#L123

Best,
Ron

Jing Ge <j...@ververica.com.invalid> 于2023年6月8日周四 04:28写道:

> Hi Ron,
>
> Thanks for raising the proposal. It is a very attractive idea! Since the
> FLIP is a relatively complex one which contains three papers and a design
> doc. It deserves more time for the discussion to make sure everyone is on
> the same page. I have a NIT question which will not block your voting
> process. Previously, it took the community a lot of effort to make Flink
> kinds of scala free. Since the code base of the table module is too big,
> instead of porting to Java, all scala code has been hidden. Furthermore,
> there are ongoing efforts to remove Scala code from Flink. As you can see,
> the community tries to limit (i.e. get rid of) scala code as much as
> possible. I was wondering if it is possible for you to implement the FLIP
> with scala free code?
>
> Best regards,
> Jing
>
> [1] https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/
>
> On Wed, Jun 7, 2023 at 5:33 PM Aitozi <gjying1...@gmail.com> wrote:
>
> > Hi Ron:
> >     Sorry for the late reply after the voting process. I just want to ask
> >
> > > Traverse the ExecNode DAG and create a FusionExecNode  for physical
> > operators that can be fused together.
> > which kind of operators can be fused together ? are the operators in an
> > operator chain? Is this optimization aligned to spark's whole stage
> codegen
> > ?
> >
> > > If any member operator does not support codegen, generate a
> > Transformation DAG based on the topological relationship of member
> ExecNode
> >  and jump to step 8.
> > step8: Generate a FusionTransformation, setting the parallelism and
> managed
> > memory for the fused operator.
> >
> > does the "support codegen" means fusion codegen? but why we generate a
> > FusionTransformation when the member operator does not support codegen,
> IMO
> > it should
> > fallback to the current behavior.
> >
> > In the end, I share the same idea with Lincoln about performance
> benchmark.
> > Currently flink community's flink-benchmark only covers like schedule,
> > state, datastream operator's performance.
> > A good benchmark harness for sql operator will benefit the sql optimizer
> > topic and observation
> >
> > Thanks,
> > Atiozi.
> >
> >
> > liu ron <ron9....@gmail.com> 于2023年6月6日周二 19:30写道:
> >
> > > Hi dev
> > >
> > > Thanks for all the feedback, it seems that here are no more comments, I
> > > will
> > > start a vote on FLIP-315 [1] later. Thanks again.
> > >
> > > [1]:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > >
> > > Best,
> > > Ron
> > >
> > > liu ron <ron9....@gmail.com> 于2023年6月5日周一 16:01写道:
> > >
> > > > Hi, Yun, Jinsong, Benchao
> > > >
> > > > Thanks for your valuable input about this FLIP.
> > > >
> > > > First of all, let me emphasize that from the technical implementation
> > > > point of view, this design is feasible in both stream and batch
> > > scenarios,
> > > > so I consider both stream and batch mode in FLIP. In the stream
> > scenario,
> > > > for stateful operator, according to our business experience,
> basically
> > > the
> > > > bottleneck is on the state access, so the optimization effect of OFCG
> > for
> > > > the stream will not be particularly obvious, so we will not give
> > priority
> > > > to support it currently. On the contrary, in the batch scenario,
> where
> > > CPU
> > > > is the bottleneck, this optimization is gainful.
> > > >
> > > > Taking the above into account, we are able to support both stream and
> > > > batch mode optimization in this design, but we will give priority to
> > > > supporting batch operators. As benchao said, when we find a suitable
> > > > streaming business scenario in the future, we can consider doing this
> > > > optimization. Back to Yun issue, the design will break state
> > > compatibility
> > > > in stream mode as[1] and the version upgrade will not support this
> > OFCG.
> > > As
> > > > mentioned earlier, we will not support this feature in stream mode in
> > the
> > > > short term.
> > > >
> > > > Also thanks to Benchao's suggestion, I will state the current goal of
> > > that
> > > > optimization in the FLIP, scoped to batch mode.
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > liu ron <ron9....@gmail.com> 于2023年6月5日周一 15:04写道:
> > > >
> > > >> Hi, Lincoln
> > > >>
> > > >> Thanks for your appreciation of this design. Regarding your
> question:
> > > >>
> > > >> > do we consider adding a benchmark for the operators to intuitively
> > > >> understand the improvement brought by each improvement?
> > > >>
> > > >> I think it makes sense to add a benchmark, Spark also has this
> > benchmark
> > > >> framework. But I think it is another story to introduce a benchmark
> > > >> framework in Flink, we need to start a new discussion to this work.
> > > >>
> > > >> > for the implementation plan, mentioned in the FLIP that 1.18 will
> > > >> support Calc, HashJoin and HashAgg, then what will be the next step?
> > and
> > > >> which operators do we ultimately expect to cover (all or specific
> > ones)?
> > > >>
> > > >> Our ultimate goal is to support all operators in batch mode, but we
> > > >> prioritize them according to their usage. Operators like Calc,
> > HashJoin,
> > > >> HashAgg, etc. are more commonly used, so we will support them first.
> > > Later
> > > >> we support the rest of the operators step by step. Considering the
> > time
> > > >> factor and the development workload, so we can only support  Calc,
> > > >> HashJoin, HashAgg in 1.18. In 1.19 or 1.20, we will complete the
> rest
> > > work.
> > > >> I will make this clear in FLIP
> > > >>
> > > >> Best,
> > > >> Ron
> > > >>
> > > >> Jingsong Li <jingsongl...@gmail.com> 于2023年6月5日周一 14:15写道:
> > > >>
> > > >>> > For the state compatibility session, it seems that the checkpoint
> > > >>> compatibility would be broken just like [1] did. Could FLIP-190 [2]
> > > still
> > > >>> be helpful in this case for SQL version upgrades?
> > > >>>
> > > >>> I guess this is only for batch processing. Streaming should be
> > another
> > > >>> story?
> > > >>>
> > > >>> Best,
> > > >>> Jingsong
> > > >>>
> > > >>> On Mon, Jun 5, 2023 at 2:07 PM Yun Tang <myas...@live.com> wrote:
> > > >>> >
> > > >>> > Hi Ron,
> > > >>> >
> > > >>> > I think this FLIP would help to improve the performance, looking
> > > >>> forward to its completion in Flink!
> > > >>> >
> > > >>> > For the state compatibility session, it seems that the checkpoint
> > > >>> compatibility would be broken just like [1] did. Could FLIP-190 [2]
> > > still
> > > >>> be helpful in this case for SQL version upgrades?
> > > >>> >
> > > >>> >
> > > >>> > [1]
> > > >>>
> > >
> >
> https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI/edit#heading=h.fri5rtpte0si
> > > >>> > [2]
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489
> > > >>> >
> > > >>> > Best
> > > >>> > Yun Tang
> > > >>> >
> > > >>> > ________________________________
> > > >>> > From: Lincoln Lee <lincoln.8...@gmail.com>
> > > >>> > Sent: Monday, June 5, 2023 10:56
> > > >>> > To: dev@flink.apache.org <dev@flink.apache.org>
> > > >>> > Subject: Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen
> > for
> > > >>> Flink SQL
> > > >>> >
> > > >>> > Hi Ron
> > > >>> >
> > > >>> > OFGC looks like an exciting optimization, looking forward to its
> > > >>> completion
> > > >>> > in Flink!
> > > >>> > A small question, do we consider adding a benchmark for the
> > operators
> > > >>> to
> > > >>> > intuitively understand the improvement brought by each
> improvement?
> > > >>> > In addition, for the implementation plan, mentioned in the FLIP
> > that
> > > >>> 1.18
> > > >>> > will support Calc, HashJoin and HashAgg, then what will be the
> next
> > > >>> step?
> > > >>> > and which operators do we ultimately expect to cover (all or
> > specific
> > > >>> ones)?
> > > >>> >
> > > >>> > Best,
> > > >>> > Lincoln Lee
> > > >>> >
> > > >>> >
> > > >>> > liu ron <ron9....@gmail.com> 于2023年6月5日周一 09:40写道:
> > > >>> >
> > > >>> > > Hi, Jark
> > > >>> > >
> > > >>> > > Thanks for your feedback, according to my initial assessment,
> the
> > > >>> work
> > > >>> > > effort is relatively large.
> > > >>> > >
> > > >>> > > Moreover, I will add a test result of all queries to the FLIP.
> > > >>> > >
> > > >>> > > Best,
> > > >>> > > Ron
> > > >>> > >
> > > >>> > > Jark Wu <imj...@gmail.com> 于2023年6月1日周四 20:45写道:
> > > >>> > >
> > > >>> > > > Hi Ron,
> > > >>> > > >
> > > >>> > > > Thanks a lot for the great proposal. The FLIP looks good to
> me
> > in
> > > >>> > > general.
> > > >>> > > > It looks like not an easy work but the performance sounds
> > > >>> promising. So I
> > > >>> > > > think it's worth doing.
> > > >>> > > >
> > > >>> > > > Besides, if there is a complete test graph with all TPC-DS
> > > >>> queries, the
> > > >>> > > > effect of this FLIP will be more intuitive.
> > > >>> > > >
> > > >>> > > > Best,
> > > >>> > > > Jark
> > > >>> > > >
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > On Wed, 31 May 2023 at 14:27, liu ron <ron9....@gmail.com>
> > > wrote:
> > > >>> > > >
> > > >>> > > > > Hi, Jinsong
> > > >>> > > > >
> > > >>> > > > > Thanks for your valuable suggestions.
> > > >>> > > > >
> > > >>> > > > > Best,
> > > >>> > > > > Ron
> > > >>> > > > >
> > > >>> > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年5月30日周二
> 13:22写道:
> > > >>> > > > >
> > > >>> > > > > > Thanks Ron for your information.
> > > >>> > > > > >
> > > >>> > > > > > I suggest that it can be written in the Motivation of
> FLIP.
> > > >>> > > > > >
> > > >>> > > > > > Best,
> > > >>> > > > > > Jingsong
> > > >>> > > > > >
> > > >>> > > > > > On Tue, May 30, 2023 at 9:57 AM liu ron <
> > ron9....@gmail.com>
> > > >>> wrote:
> > > >>> > > > > > >
> > > >>> > > > > > > Hi, Jingsong
> > > >>> > > > > > >
> > > >>> > > > > > > Thanks for your review. We have tested it in TPC-DS
> case,
> > > >>> and got a
> > > >>> > > > 12%
> > > >>> > > > > > > gain overall when only supporting only
> > > Calc&HashJoin&HashAgg
> > > >>> > > > operator.
> > > >>> > > > > In
> > > >>> > > > > > > some queries, we even get more than 30% gain, it looks
> > like
> > > >>> an
> > > >>> > > > > effective
> > > >>> > > > > > > way.
> > > >>> > > > > > >
> > > >>> > > > > > > Best,
> > > >>> > > > > > > Ron
> > > >>> > > > > > >
> > > >>> > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年5月29日周一
> > > 14:33写道:
> > > >>> > > > > > >
> > > >>> > > > > > > > Thanks Ron for the proposal.
> > > >>> > > > > > > >
> > > >>> > > > > > > > Do you have some benchmark results for the
> performance
> > > >>> > > > improvement? I
> > > >>> > > > > > > > am more concerned about the improvement on Flink than
> > the
> > > >>> data in
> > > >>> > > > > > > > other papers.
> > > >>> > > > > > > >
> > > >>> > > > > > > > Best,
> > > >>> > > > > > > > Jingsong
> > > >>> > > > > > > >
> > > >>> > > > > > > > On Mon, May 29, 2023 at 2:16 PM liu ron <
> > > >>> ron9....@gmail.com>
> > > >>> > > > wrote:
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > Hi, dev
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > I'd like to start a discussion about FLIP-315:
> > Support
> > > >>> Operator
> > > >>> > > > > > Fusion
> > > >>> > > > > > > > > Codegen for Flink SQL[1]
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > As main memory grows, query performance is more and
> > > more
> > > >>> > > > determined
> > > >>> > > > > > by
> > > >>> > > > > > > > the
> > > >>> > > > > > > > > raw CPU costs of query processing itself, this is
> due
> > > to
> > > >>> the
> > > >>> > > > query
> > > >>> > > > > > > > > processing techniques based on interpreted
> execution
> > > >>> shows poor
> > > >>> > > > > > > > performance
> > > >>> > > > > > > > > on modern CPUs due to lack of locality and frequent
> > > >>> instruction
> > > >>> > > > > > > > > mis-prediction. Therefore, the industry is also
> > > >>> researching how
> > > >>> > > > to
> > > >>> > > > > > > > improve
> > > >>> > > > > > > > > engine performance by increasing operator execution
> > > >>> efficiency.
> > > >>> > > > In
> > > >>> > > > > > > > > addition, during the process of optimizing Flink's
> > > >>> performance
> > > >>> > > > for
> > > >>> > > > > > TPC-DS
> > > >>> > > > > > > > > queries, we found that a significant amount of CPU
> > time
> > > >>> was
> > > >>> > > spent
> > > >>> > > > > on
> > > >>> > > > > > > > > virtual function calls, framework collector calls,
> > and
> > > >>> invalid
> > > >>> > > > > > > > > calculations, which can be optimized to improve the
> > > >>> overall
> > > >>> > > > engine
> > > >>> > > > > > > > > performance. After some investigation, we found
> > > Operator
> > > >>> Fusion
> > > >>> > > > > > Codegen
> > > >>> > > > > > > > > which is proposed by Thomas Neumann in the paper[2]
> > can
> > > >>> address
> > > >>> > > > > these
> > > >>> > > > > > > > > problems. I have finished a PoC[3] to verify its
> > > >>> feasibility
> > > >>> > > and
> > > >>> > > > > > > > validity.
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > Looking forward to your feedback.
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > [1]:
> > > >>> > > > > > > > >
> > > >>> > > > > > > >
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > > >>> > > > > > > > > [2]:
> http://www.vldb.org/pvldb/vol4/p539-neumann.pdf
> > > >>> > > > > > > > > [3]: https://github.com/lsyldliu/flink/tree/OFCG
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > Best,
> > > >>> > > > > > > > > Ron
> > > >>> > > > > > > >
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>>
> > > >>
> > >
> >
>

Reply via email to