This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push: new 97ac1ab [FLINK-10638][table] Invalid table scan resolution for temporal join queries 97ac1ab is described below commit 97ac1ab0cc513511a0b61b52663e451c0e52218b Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Thu Nov 1 12:19:14 2018 +0100 [FLINK-10638][table] Invalid table scan resolution for temporal join queries Previously there was a strict fixed order of applying LogicalCorrelateToTemporalTableJoinRule and TableScanRule rules. This was causing problems, since either of them could create a new RelNodes that have to be subject of the other rule (imagine deeply nested TemporalTableFunction that references registered tables/views and other TemporalTableFunctions). Solution to this problem is to run both of those rules in one group/collection in HepPlaner. Instead of applying one rule to whole tree then the other rule, both rules are applied to a parent node, before going down/deeper. --- .../flink/table/api/BatchTableEnvironment.scala | 5 +- .../flink/table/api/StreamTableEnvironment.scala | 8 +-- .../apache/flink/table/api/TableEnvironment.scala | 64 ++++++++++++++++------ .../flink/table/plan/rules/FlinkRuleSets.scala | 16 ++---- .../api/stream/sql/TemporalTableJoinTest.scala | 4 +- .../api/stream/table/TemporalTableJoinTest.scala | 7 ++- .../runtime/stream/sql/TemporalJoinITCase.scala | 10 ++-- 7 files changed, 72 insertions(+), 42 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 6a7a921..99e9d7e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -449,9 +449,8 @@ abstract class BatchTableEnvironment( */ private[flink] def optimize(relNode: RelNode): RelNode = { val convSubQueryPlan = optimizeConvertSubQueries(relNode) - val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan) - val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan) - val decorPlan = RelDecorrelator.decorrelateQuery(fullNode) + val expandedPlan = optimizeExpandPlan(convSubQueryPlan) + val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan) val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan) val logicalPlan = optimizeLogicalPlan(normalizedPlan) optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 4973f34..8c6a1e0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -804,13 +804,13 @@ abstract class StreamTableEnvironment( */ private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = { val convSubQueryPlan = optimizeConvertSubQueries(relNode) - val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan) - val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan) - val decorPlan = RelDecorrelator.decorrelateQuery(fullNode) + val expandedPlan = optimizeExpandPlan(convSubQueryPlan) + val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan) val planWithMaterializedTimeAttributes = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes) val logicalPlan = optimizeLogicalPlan(normalizedPlan) + val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM) optimizeDecoratePlan(physicalPlan, updatesAsRetraction) } @@ -827,7 +827,7 @@ abstract class StreamTableEnvironment( } else { relNode } - runHepPlanner( + runHepPlannerSequentially( HepMatchOrder.BOTTOM_UP, decoRuleSet, planToDecorate, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 58831d1..26f9e50 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList import org.apache.calcite.config.Lex import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.RelOptPlanner.CannotPlanException -import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} +import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgram, HepProgramBuilder} import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.schema.SchemaPlus @@ -256,34 +256,31 @@ abstract class TableEnvironment(val config: TableConfig) { protected def getBuiltInPhysicalOptRuleSet: RuleSet protected def optimizeConvertSubQueries(relNode: RelNode): RelNode = { - runHepPlanner( + runHepPlannerSequentially( HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet) } - protected def optimizeConvertToTemporalJoin(relNode: RelNode): RelNode = { - runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TEMPORAL_JOIN_RULES, + protected def optimizeExpandPlan(relNode: RelNode): RelNode = { + val result = runHepPlannerSimultaneously( + HepMatchOrder.TOP_DOWN, + FlinkRuleSets.EXPAND_PLAN_RULES, relNode, relNode.getTraitSet) - } - protected def optimizeConvertTableReferences(relNode: RelNode): RelNode = { - runHepPlanner( - HepMatchOrder.BOTTOM_UP, - FlinkRuleSets.TABLE_REF_RULES, - relNode, - relNode.getTraitSet) + runHepPlannerSequentially( + HepMatchOrder.TOP_DOWN, + FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES, + result, + result.getTraitSet) } - protected def optimizeNormalizeLogicalPlan(relNode: RelNode): RelNode = { val normRuleSet = getNormRuleSet if (normRuleSet.iterator().hasNext) { - runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet) + runHepPlannerSequentially(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet) } else { relNode } @@ -310,13 +307,16 @@ abstract class TableEnvironment(val config: TableConfig) { } /** - * run HEP planner + * run HEP planner with rules applied one by one. First apply one rule to all of the nodes + * and only then apply the next rule. If a rule creates a new node preceding rules will not + * be applied to the newly created node. */ - protected def runHepPlanner( + protected def runHepPlannerSequentially( hepMatchOrder: HepMatchOrder, ruleSet: RuleSet, input: RelNode, targetTraits: RelTraitSet): RelNode = { + val builder = new HepProgramBuilder builder.addMatchOrder(hepMatchOrder) @@ -324,8 +324,36 @@ abstract class TableEnvironment(val config: TableConfig) { while (it.hasNext) { builder.addRuleInstance(it.next()) } + runHepPlanner(builder.build(), input, targetTraits) + } + + /** + * run HEP planner with rules applied simultaneously. Apply all of the rules to the given + * node before going to the next one. If a rule creates a new node all of the rules will + * be applied to this new node. + */ + protected def runHepPlannerSimultaneously( + hepMatchOrder: HepMatchOrder, + ruleSet: RuleSet, + input: RelNode, + targetTraits: RelTraitSet): RelNode = { + + val builder = new HepProgramBuilder + builder.addMatchOrder(hepMatchOrder) + + builder.addRuleCollection(ruleSet.asScala.toList.asJava) + runHepPlanner(builder.build(), input, targetTraits) + } + + /** + * run HEP planner + */ + protected def runHepPlanner( + hepProgram: HepProgram, + input: RelNode, + targetTraits: RelTraitSet): RelNode = { - val planner = new HepPlanner(builder.build, frameworkConfig.getContext) + val planner = new HepPlanner(hepProgram, frameworkConfig.getContext) planner.setRoot(input) if (input.getTraitSet != targetTraits) { planner.changeTraits(input, targetTraits.simplify) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 6e2ccde..5e0ee32 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -39,18 +39,14 @@ object FlinkRuleSets { SubQueryRemoveRule.JOIN) /** - * Handles proper conversion of correlate queries with temporal table functions - * into temporal table joins. This can create new table scans in the plan. + * Expand plan by replacing references to tables into a proper plan sub trees. Those rules + * can create new plan nodes. */ - val TEMPORAL_JOIN_RULES: RuleSet = RuleSets.ofList( - LogicalCorrelateToTemporalTableJoinRule.INSTANCE - ) + val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList( + LogicalCorrelateToTemporalTableJoinRule.INSTANCE, + TableScanRule.INSTANCE) - /** - * Convert table references before query decorrelation. - */ - val TABLE_REF_RULES: RuleSet = RuleSets.ofList( - TableScanRule.INSTANCE, + val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList( EnumerableToLogicalTableScan.INSTANCE) val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala index 3c47f56..27c40bb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala @@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase { val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)]( "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 'secondary_key) - val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency) + val rates = ratesHistory + .filter('rate > 110L) + .createTemporalTableFunction('rowtime, 'currency) util.addFunction("Rates", rates) val sqlQuery = diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala index f8d4923..299c144 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala @@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase { val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)]( "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 'secondary_key) - val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency) + val rates = ratesHistory + .filter('rate > 110L) + .createTemporalTableFunction('rowtime, 'currency) util.addFunction("Rates", rates) val result = orders @@ -226,7 +228,8 @@ object TemporalTableJoinTest { unaryNode( "DataStreamCalc", streamTableNode(2), - term("select", "rowtime, currency, rate, secondary_key") + term("select", "rowtime, currency, rate, secondary_key"), + term("where", ">(rate, 110)") ), term("where", "AND(" + diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala index df0f01b..0fb1753 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala @@ -127,8 +127,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { var expectedOutput = new mutable.HashSet[String]() expectedOutput += (2 * 114).toString - expectedOutput += (1 * 102).toString - expectedOutput += (50 * 1).toString expectedOutput += (3 * 116).toString val orders = env @@ -142,11 +140,15 @@ class TemporalJoinITCase extends StreamingWithStateTestBase { tEnv.registerTable("Orders", orders) tEnv.registerTable("RatesHistory", ratesHistory) + tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L)) tEnv.registerFunction( "Rates", - ratesHistory.createTemporalTableFunction('rowtime, 'currency)) + tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency)) + tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) - val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + // Scan from registered table to test for interplay between + // LogicalCorrelateToTemporalTableJoinRule and TableScanRule + val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute()