xuyangzhong commented on a change in pull request #17652: URL: https://github.com/apache/flink/pull/17652#discussion_r742532909
########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala ########## @@ -246,77 +260,80 @@ object FlinkBatchRuleSets { * RuleSet to do logical optimize. * This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]]. */ - private val LOGICAL_RULES: RuleSet = RuleSets.ofList( - // scan optimization - PushProjectIntoTableSourceScanRule.INSTANCE, - PushProjectIntoLegacyTableSourceScanRule.INSTANCE, - PushFilterIntoTableSourceScanRule.INSTANCE, - PushFilterIntoLegacyTableSourceScanRule.INSTANCE, - - // reorder sort and projection - CoreRules.SORT_PROJECT_TRANSPOSE, - // remove unnecessary sort rule - CoreRules.SORT_REMOVE, - - // join rules - FlinkJoinPushExpressionsRule.INSTANCE, - SimplifyJoinConditionRule.INSTANCE, - - // remove union with only a single child - CoreRules.UNION_REMOVE, - // convert non-all union into all-union + distinct - CoreRules.UNION_TO_DISTINCT, - - // aggregation and projection rules - CoreRules.AGGREGATE_PROJECT_MERGE, - CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS, - - // remove aggregation if it does not aggregate and input is already distinct - FlinkAggregateRemoveRule.INSTANCE, - // push aggregate through join - FlinkAggregateJoinTransposeRule.EXTENDED, - // aggregate union rule - CoreRules.AGGREGATE_UNION_AGGREGATE, - // expand distinct aggregate to normal aggregate with groupby - FlinkAggregateExpandDistinctAggregatesRule.INSTANCE, - - // reduce aggregate functions like AVG, STDDEV_POP etc. - CoreRules.AGGREGATE_REDUCE_FUNCTIONS, - WindowAggregateReduceFunctionsRule.INSTANCE, - - // reduce group by columns - AggregateReduceGroupingRule.INSTANCE, - // reduce useless aggCall - PruneAggregateCallRule.PROJECT_ON_AGGREGATE, - PruneAggregateCallRule.CALC_ON_AGGREGATE, - - // expand grouping sets - DecomposeGroupingSetsRule.INSTANCE, - - // rank rules - FlinkLogicalRankRule.CONSTANT_RANGE_INSTANCE, - // transpose calc past rank to reduce rank input fields - CalcRankTransposeRule.INSTANCE, - // remove output of rank number when it is a constant - ConstantRankNumberColumnRemoveRule.INSTANCE, - - // calc rules - CoreRules.FILTER_CALC_MERGE, - CoreRules.PROJECT_CALC_MERGE, - CoreRules.FILTER_TO_CALC, - CoreRules.PROJECT_TO_CALC, - FlinkCalcMergeRule.INSTANCE, - - // semi/anti join transpose rule - FlinkSemiAntiJoinJoinTransposeRule.INSTANCE, - FlinkSemiAntiJoinProjectTransposeRule.INSTANCE, - FlinkSemiAntiJoinFilterTransposeRule.INSTANCE, - - // set operators - ReplaceIntersectWithSemiJoinRule.INSTANCE, - RewriteIntersectAllRule.INSTANCE, - ReplaceMinusWithAntiJoinRule.INSTANCE, - RewriteMinusAllRule.INSTANCE + private val LOGICAL_RULES: RuleSet = RuleSets.ofList(( + RuleSets.ofList( Review comment: I will reset the code to keep the previous format here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org