Hi devs,

I just wanted to give an update on this FLIP.
I updated the doc based on the comments from Jim.
Also, I developed a prototype and did some testing.

I in my small prototype I ran the following tests:

   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
   -
   
org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4


These tests are e2e dag optimization, including query parsing, validation,
optimization, and checking the results.

In these e2e optimization tests, my prototype was 15-20% faster than
existing Flink optimization structure (with the "cost" of simplifying the
codebase).


Any questions/comments are more than welcome.


Regards,

Jeyhun Karimov

On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi Jim,
>
> Thanks for your comments. Please find my answers below:
>
>    1. StreamOptimizeContext may still be needed to pass the fact that we
>>    are optimizing a streaming query.  I don't think this class will go
>> away
>>    completely.  (I agree it may become more simple if the kind or
>>    mini-batch configuration can be removed.)
>
>
> What I meant is that it might go away if we get rid of
> *isUpdateBeforeRequired* and *getMiniBatchInterval *fields.
> Of course if we can get rid of only one of them, then the
> *StreamOptimizeContext* class will not be removed but get simpler.
> Will update the doc accordingly.
>
>    2. How are the mini-batch and changelog inference rules tightly coupled?
>>    I looked a little bit and I haven't seen any connection between them.
>> It
>>    seems like the changelog inference is what needs to run multiple times.
>
>
> Sorry for the misunderstanding. The mini-batch and changelog inference are
> not coupled among themselves but with the high-level optimization logic.
> The idea is to separate the query optimization into 1) optimize 2) enrich
> with changelog inference 3) enrich with mini-batch interval inference and
> 4) rewrite
>
>    3. I think your point about code complexity is unnecessary.
>> StreamOptimizeContext
>>    extends org.apache.calcite.plan.Context which is used an interface to
>> pass
>>    information and objects through the Calcite stack.
>
>
> I partially agree. Please see my answer above for the question 1.
>
>    4. Is an alternative where the complexity of the changelog optimization
>>    can be moved into the `FlinkChangelogModeInferenceProgram`?  (If this
>> is
>>    coupling between the mini-batch and changelog rules, then this would
>> not
>>    make sense.)
>
>
> Good point. Yes, this is definitely an alternative.
>
>    5. There are some other smaller refactorings.  I tried some of them
>>    here: https://github.com/apache/flink/pull/24108 Mostly, it is syntax
>>    and using lazy vals to avoid recomputing various things.  (Feel free to
>>    take whatever actually works; I haven't run the tests.)
>
>
> I took a look at your PR. For sure, some of the refactorings I will reuse
> (probably rebase by the time I have this ready :))
>
>
> Separately, folks on the Calcite dev list are thinking about multi-query
>> optimization:
>> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
>> https://issues.apache.org/jira/browse/CALCITE-6188
>
>
> Seems interesting. But Calcite's MQO approach will probably require some
> drastic changes in our codebase once we adopt it.
> This approach is more incremental.
>
> Hope my comments answer your questions.
>
> Regards,
> Jeyhun Karimov
>
> On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes <jhug...@confluent.io.invalid>
> wrote:
>
>> Hi Jeyhun,
>>
>>
>> Generally, I like the idea of speeding up the optimizer in the case of
>> multiple queries!
>>
>>
>> I am new to the optimizer, but I have a few comments / questions.
>>
>>
>>
>>    1. StreamOptimizeContext may still be needed to pass the fact that we
>>    are optimizing a streaming query.  I don't think this class will go
>> away
>>    completely.  (I agree it may become more simple if the kind or
>>    mini-batch configuration can be removed.)
>>    2. How are the mini-batch and changelog inference rules tightly
>> coupled?
>>    I looked a little bit and I haven't seen any connection between them.
>> It
>>    seems like the changelog inference is what needs to run multiple times.
>>    3. I think your point about code complexity is unnecessary.
>> StreamOptimizeContext
>>    extends org.apache.calcite.plan.Context which is used an interface to
>> pass
>>    information and objects through the Calcite stack.
>>    4. Is an alternative where the complexity of the changelog optimization
>>    can be moved into the `FlinkChangelogModeInferenceProgram`?  (If this
>> is
>>    coupling between the mini-batch and changelog rules, then this would
>> not
>>    make sense.)
>>    5. There are some other smaller refactorings.  I tried some of them
>>    here: https://github.com/apache/flink/pull/24108 Mostly, it is syntax
>>    and using lazy vals to avoid recomputing various things.  (Feel free to
>>    take whatever actually works; I haven't run the tests.)
>>
>> Separately, folks on the Calcite dev list are thinking about multi-query
>> optimization:
>> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
>> https://issues.apache.org/jira/browse/CALCITE-6188
>>
>> Cheers,
>>
>>
>> Jim
>>
>> On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov <je.kari...@gmail.com>
>> wrote:
>>
>> > Hi devs,
>> >
>> > I’d like to start a discussion on FLIP-419: Optimize multi-sink query
>> plan
>> > generation [1].
>> >
>> >
>> > Currently, the optimization process of multi-sink query plans are
>> > suboptimal: 1) it requires to go through the optimization process
>> several
>> > times and 2) as a result of this some low-level code complexity is
>> > introduced on high level optimization classes such
>> > as StreamCommonSubGraphBasedOptimizer.
>> >
>> >
>> > To address this issue, this FLIP introduces  to decouple changelog and
>> > mini-batch interval inference from the main optimization process.
>> >
>> > Please find more details in the FLIP wiki document [1]. Looking forward
>> to
>> > your feedback.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation
>> >
>> >
>> > Regards,
>> > Jeyhun Karimov
>> >
>>
>

Reply via email to