twalthr commented on a change in pull request #17652:
URL: https://github.com/apache/flink/pull/17652#discussion_r741689692
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
##########
@@ -246,77 +260,80 @@ object FlinkBatchRuleSets {
* RuleSet to do logical optimize.
* This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
*/
- private val LOGICAL_RULES: RuleSet = RuleSets.ofList(
- // scan optimization
- PushProjectIntoTableSourceScanRule.INSTANCE,
- PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
- PushFilterIntoTableSourceScanRule.INSTANCE,
- PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
-
- // reorder sort and projection
- CoreRules.SORT_PROJECT_TRANSPOSE,
- // remove unnecessary sort rule
- CoreRules.SORT_REMOVE,
-
- // join rules
- FlinkJoinPushExpressionsRule.INSTANCE,
- SimplifyJoinConditionRule.INSTANCE,
-
- // remove union with only a single child
- CoreRules.UNION_REMOVE,
- // convert non-all union into all-union + distinct
- CoreRules.UNION_TO_DISTINCT,
-
- // aggregation and projection rules
- CoreRules.AGGREGATE_PROJECT_MERGE,
- CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS,
-
- // remove aggregation if it does not aggregate and input is already
distinct
- FlinkAggregateRemoveRule.INSTANCE,
- // push aggregate through join
- FlinkAggregateJoinTransposeRule.EXTENDED,
- // aggregate union rule
- CoreRules.AGGREGATE_UNION_AGGREGATE,
- // expand distinct aggregate to normal aggregate with groupby
- FlinkAggregateExpandDistinctAggregatesRule.INSTANCE,
-
- // reduce aggregate functions like AVG, STDDEV_POP etc.
- CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
- WindowAggregateReduceFunctionsRule.INSTANCE,
-
- // reduce group by columns
- AggregateReduceGroupingRule.INSTANCE,
- // reduce useless aggCall
- PruneAggregateCallRule.PROJECT_ON_AGGREGATE,
- PruneAggregateCallRule.CALC_ON_AGGREGATE,
-
- // expand grouping sets
- DecomposeGroupingSetsRule.INSTANCE,
-
- // rank rules
- FlinkLogicalRankRule.CONSTANT_RANGE_INSTANCE,
- // transpose calc past rank to reduce rank input fields
- CalcRankTransposeRule.INSTANCE,
- // remove output of rank number when it is a constant
- ConstantRankNumberColumnRemoveRule.INSTANCE,
-
- // calc rules
- CoreRules.FILTER_CALC_MERGE,
- CoreRules.PROJECT_CALC_MERGE,
- CoreRules.FILTER_TO_CALC,
- CoreRules.PROJECT_TO_CALC,
- FlinkCalcMergeRule.INSTANCE,
-
- // semi/anti join transpose rule
- FlinkSemiAntiJoinJoinTransposeRule.INSTANCE,
- FlinkSemiAntiJoinProjectTransposeRule.INSTANCE,
- FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,
-
- // set operators
- ReplaceIntersectWithSemiJoinRule.INSTANCE,
- RewriteIntersectAllRule.INSTANCE,
- ReplaceMinusWithAntiJoinRule.INSTANCE,
- RewriteMinusAllRule.INSTANCE
+ private val LOGICAL_RULES: RuleSet = RuleSets.ofList((
+ RuleSets.ofList(
Review comment:
introduce constants for the other lists as well to improve readability,
in the end this should be only a concatenation of constants instead of this
hybrid/nested approach.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
is the `toString` consistent now everywhere? first `partitions`, then
`filter`, then `project`, then `metadata`. This is how a source should work.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
> specs will be print each by the order in which they added to the table
source
My question if the print order is consistent everywhere for the specs. But I
don't think we need more tests for now. Thanks.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
> specs will be print each by the order in which they added to the table
source
My question was rather if the print order is consistent everywhere for the
specs. But I don't think we need more tests for now. Thanks.
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSourceJsonPlanTest_jsonplan/testPartitionPushDown.out
##########
@@ -64,7 +61,7 @@
"b" : "INT"
} ]
},
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, filter=[], partitions=[{p=A}], project=[a,
b], metadata=[]]], fields=[a, b])",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, PartitionTable, partitions=[{p=A}], project=[a, b],
metadata=[]]], fields=[a, b])",
Review comment:
Watermarks and filters are a tricky topic anyway, because if the filter
is too good, you will have almost no progress in event-time anymore. Let's open
a follow up issue for this. I think having a clear spec order could be
beneficial.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]