This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 97ac1ab  [FLINK-10638][table] Invalid table scan resolution for 
temporal join queries
97ac1ab is described below

commit 97ac1ab0cc513511a0b61b52663e451c0e52218b
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Thu Nov 1 12:19:14 2018 +0100

    [FLINK-10638][table] Invalid table scan resolution for temporal join queries
    
    Previously there was a strict fixed order of applying 
LogicalCorrelateToTemporalTableJoinRule
    and TableScanRule rules. This was causing problems, since either of them 
could create a new
    RelNodes that have to be subject of the other rule (imagine deeply nested 
TemporalTableFunction
    that references registered tables/views and other TemporalTableFunctions).
    
    Solution to this problem is to run both of those rules in one 
group/collection in HepPlaner.
    Instead of applying one rule to whole tree then the other rule, both rules 
are applied to
    a parent node, before going down/deeper.
---
 .../flink/table/api/BatchTableEnvironment.scala    |  5 +-
 .../flink/table/api/StreamTableEnvironment.scala   |  8 +--
 .../apache/flink/table/api/TableEnvironment.scala  | 64 ++++++++++++++++------
 .../flink/table/plan/rules/FlinkRuleSets.scala     | 16 ++----
 .../api/stream/sql/TemporalTableJoinTest.scala     |  4 +-
 .../api/stream/table/TemporalTableJoinTest.scala   |  7 ++-
 .../runtime/stream/sql/TemporalJoinITCase.scala    | 10 ++--
 7 files changed, 72 insertions(+), 42 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 6a7a921..99e9d7e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -449,9 +449,8 @@ abstract class BatchTableEnvironment(
     */
   private[flink] def optimize(relNode: RelNode): RelNode = {
     val convSubQueryPlan = optimizeConvertSubQueries(relNode)
-    val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
-    val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan)
-    val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
+    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
     val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
     val logicalPlan = optimizeLogicalPlan(normalizedPlan)
     optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4973f34..8c6a1e0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -804,13 +804,13 @@ abstract class StreamTableEnvironment(
     */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): 
RelNode = {
     val convSubQueryPlan = optimizeConvertSubQueries(relNode)
-    val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
-    val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan)
-    val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
+    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
     val planWithMaterializedTimeAttributes =
       RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
     val normalizedPlan = 
optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
     val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+
     val physicalPlan = optimizePhysicalPlan(logicalPlan, 
FlinkConventions.DATASTREAM)
     optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
   }
@@ -827,7 +827,7 @@ abstract class StreamTableEnvironment(
       } else {
         relNode
       }
-      runHepPlanner(
+      runHepPlannerSequentially(
         HepMatchOrder.BOTTOM_UP,
         decoRuleSet,
         planToDecorate,
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 58831d1..26f9e50 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.Lex
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, 
HepProgramBuilder}
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgram, 
HepProgramBuilder}
 import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.schema.SchemaPlus
@@ -256,34 +256,31 @@ abstract class TableEnvironment(val config: TableConfig) {
   protected def getBuiltInPhysicalOptRuleSet: RuleSet
 
   protected def optimizeConvertSubQueries(relNode: RelNode): RelNode = {
-    runHepPlanner(
+    runHepPlannerSequentially(
       HepMatchOrder.BOTTOM_UP,
       FlinkRuleSets.TABLE_SUBQUERY_RULES,
       relNode,
       relNode.getTraitSet)
   }
 
-  protected def optimizeConvertToTemporalJoin(relNode: RelNode): RelNode = {
-    runHepPlanner(
-      HepMatchOrder.BOTTOM_UP,
-      FlinkRuleSets.TEMPORAL_JOIN_RULES,
+  protected def optimizeExpandPlan(relNode: RelNode): RelNode = {
+    val result = runHepPlannerSimultaneously(
+      HepMatchOrder.TOP_DOWN,
+      FlinkRuleSets.EXPAND_PLAN_RULES,
       relNode,
       relNode.getTraitSet)
-  }
 
-  protected def optimizeConvertTableReferences(relNode: RelNode): RelNode = {
-    runHepPlanner(
-      HepMatchOrder.BOTTOM_UP,
-      FlinkRuleSets.TABLE_REF_RULES,
-      relNode,
-      relNode.getTraitSet)
+    runHepPlannerSequentially(
+      HepMatchOrder.TOP_DOWN,
+      FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES,
+      result,
+      result.getTraitSet)
   }
 
-
   protected def optimizeNormalizeLogicalPlan(relNode: RelNode): RelNode = {
     val normRuleSet = getNormRuleSet
     if (normRuleSet.iterator().hasNext) {
-      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, 
relNode.getTraitSet)
+      runHepPlannerSequentially(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, 
relNode.getTraitSet)
     } else {
       relNode
     }
@@ -310,13 +307,16 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
-    * run HEP planner
+    * run HEP planner with rules applied one by one. First apply one rule to 
all of the nodes
+    * and only then apply the next rule. If a rule creates a new node 
preceding rules will not
+    * be applied to the newly created node.
     */
-  protected def runHepPlanner(
+  protected def runHepPlannerSequentially(
     hepMatchOrder: HepMatchOrder,
     ruleSet: RuleSet,
     input: RelNode,
     targetTraits: RelTraitSet): RelNode = {
+
     val builder = new HepProgramBuilder
     builder.addMatchOrder(hepMatchOrder)
 
@@ -324,8 +324,36 @@ abstract class TableEnvironment(val config: TableConfig) {
     while (it.hasNext) {
       builder.addRuleInstance(it.next())
     }
+    runHepPlanner(builder.build(), input, targetTraits)
+  }
+
+  /**
+    * run HEP planner with rules applied simultaneously. Apply all of the 
rules to the given
+    * node before going to the next one. If a rule creates a new node all of 
the rules will
+    * be applied to this new node.
+    */
+  protected def runHepPlannerSimultaneously(
+    hepMatchOrder: HepMatchOrder,
+    ruleSet: RuleSet,
+    input: RelNode,
+    targetTraits: RelTraitSet): RelNode = {
+
+    val builder = new HepProgramBuilder
+    builder.addMatchOrder(hepMatchOrder)
+
+    builder.addRuleCollection(ruleSet.asScala.toList.asJava)
+    runHepPlanner(builder.build(), input, targetTraits)
+  }
+
+  /**
+    * run HEP planner
+    */
+  protected def runHepPlanner(
+    hepProgram: HepProgram,
+    input: RelNode,
+    targetTraits: RelTraitSet): RelNode = {
 
-    val planner = new HepPlanner(builder.build, frameworkConfig.getContext)
+    val planner = new HepPlanner(hepProgram, frameworkConfig.getContext)
     planner.setRoot(input)
     if (input.getTraitSet != targetTraits) {
       planner.changeTraits(input, targetTraits.simplify)
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 6e2ccde..5e0ee32 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -39,18 +39,14 @@ object FlinkRuleSets {
     SubQueryRemoveRule.JOIN)
 
   /**
-    * Handles proper conversion of correlate queries with temporal table 
functions
-    * into temporal table joins. This can create new table scans in the plan.
+    * Expand plan by replacing references to tables into a proper plan sub 
trees. Those rules
+    * can create new plan nodes.
     */
-  val TEMPORAL_JOIN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToTemporalTableJoinRule.INSTANCE
-  )
+  val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
+    LogicalCorrelateToTemporalTableJoinRule.INSTANCE,
+    TableScanRule.INSTANCE)
 
-  /**
-    * Convert table references before query decorrelation.
-    */
-  val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
-    TableScanRule.INSTANCE,
+  val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
 
   val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
index 3c47f56..27c40bb 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
@@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase {
 
     val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)](
       "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 
'secondary_key)
-    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+    val rates = ratesHistory
+      .filter('rate > 110L)
+      .createTemporalTableFunction('rowtime, 'currency)
     util.addFunction("Rates", rates)
 
     val sqlQuery =
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
index f8d4923..299c144 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
@@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase {
 
     val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)](
       "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 
'secondary_key)
-    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+    val rates = ratesHistory
+      .filter('rate > 110L)
+      .createTemporalTableFunction('rowtime, 'currency)
     util.addFunction("Rates", rates)
 
     val result = orders
@@ -226,7 +228,8 @@ object TemporalTableJoinTest {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(2),
-            term("select", "rowtime, currency, rate, secondary_key")
+            term("select", "rowtime, currency, rate, secondary_key"),
+            term("where", ">(rate, 110)")
           ),
           term("where",
             "AND(" +
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
index df0f01b..0fb1753 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
@@ -127,8 +127,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase 
{
 
     var expectedOutput = new mutable.HashSet[String]()
     expectedOutput += (2 * 114).toString
-    expectedOutput += (1 * 102).toString
-    expectedOutput += (50 * 1).toString
     expectedOutput += (3 * 116).toString
 
     val orders = env
@@ -142,11 +140,15 @@ class TemporalJoinITCase extends 
StreamingWithStateTestBase {
 
     tEnv.registerTable("Orders", orders)
     tEnv.registerTable("RatesHistory", ratesHistory)
+    tEnv.registerTable("FilteredRatesHistory", 
tEnv.scan("RatesHistory").filter('rate > 110L))
     tEnv.registerFunction(
       "Rates",
-      ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+      tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 
'currency))
+    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
 
-    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    // Scan from registered table to test for interplay between
+    // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
+    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 

Reply via email to