This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4101316ef37 [FLINK-36417][table] Add support for hints in 
WatermarkAssigner
4101316ef37 is described below

commit 4101316ef37d3a7082bff57cb732f4e9e0349d09
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Thu Oct 3 21:32:16 2024 +0200

    [FLINK-36417][table] Add support for hints in WatermarkAssigner
---
 .../table/planner/calcite/FlinkRelBuilder.java     |  3 +-
 .../flink/table/planner/hint/FlinkHints.java       |  4 +-
 .../logical/EventTimeTemporalJoinRewriteRule.java  |  6 +-
 ...arkAssignerChangelogNormalizeTransposeRule.java |  1 +
 .../nodes/calcite/LogicalWatermarkAssigner.scala   | 16 ++++-
 .../plan/nodes/calcite/WatermarkAssigner.scala     | 22 +++++-
 .../logical/FlinkLogicalWatermarkAssigner.scala    | 27 +++++++-
 .../stream/StreamPhysicalWatermarkAssigner.scala   | 19 +++++-
 .../StreamPhysicalWatermarkAssignerRule.scala      |  3 +
 .../plan/hints/stream/StateTtlHintTest.java        | 45 +++++++++++++
 .../planner/plan/hints/stream/StateTtlHintTest.xml | 78 ++++++++++++++++++++++
 .../ProjectWatermarkAssignerTransposeRuleTest.xml  |  4 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  1 +
 13 files changed, 213 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 71129e26d65..60a0c897eee 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -253,7 +253,8 @@ public final class FlinkRelBuilder extends RelBuilder {
     public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
         final RelNode input = build();
         final RelNode relNode =
-                LogicalWatermarkAssigner.create(cluster, input, 
rowtimeFieldIndex, watermarkExpr);
+                LogicalWatermarkAssigner.create(
+                        cluster, input, Collections.emptyList(), 
rowtimeFieldIndex, watermarkExpr);
         return push(relNode);
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index 18f13d75472..bcbc1dca533 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.hint;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner;
 import 
org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
 import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
 
@@ -119,7 +120,8 @@ public abstract class FlinkHints {
     public static boolean canTransposeToTableScan(RelNode node) {
         return node instanceof LogicalProject // computed column on table
                 || node instanceof LogicalFilter
-                || node instanceof LogicalSnapshot;
+                || node instanceof LogicalSnapshot
+                || node instanceof WatermarkAssigner;
     }
 
     /** Returns the qualified name of a table scan, otherwise returns empty. */
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
index 89feca8a2b6..64df95c979e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java
@@ -140,7 +140,11 @@ public class EventTimeTemporalJoinRewriteRule
             final RelNode newChild = transmitSnapshotRequirement(child);
             if (newChild != child) {
                 return wma.copy(
-                        wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), 
wma.watermarkExpr());
+                        wma.getTraitSet(),
+                        newChild,
+                        wma.getHints(),
+                        wma.rowtimeFieldIndex(),
+                        wma.watermarkExpr());
             }
             return wma;
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
index 15d8f485994..51963d79eb3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java
@@ -288,6 +288,7 @@ public class 
WatermarkAssignerChangelogNormalizeTransposeRule
                 watermark.copy(
                         
watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()),
                         exchange.getInput(),
+                        Collections.emptyList(),
                         newRowTimeFieldIndex,
                         newWatermarkExpr);
         final RelNode newChangelogNormalize =
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
index 89a9cb7e3c4..c9d3443f526 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala
@@ -19,8 +19,11 @@ package org.apache.flink.table.planner.plan.nodes.calcite
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rex.RexNode
 
+import java.util
+
 /**
  * Sub-class of [[WatermarkAssigner]] that is a relational operator which 
generates
  * [[org.apache.flink.streaming.api.watermark.Watermark]]. This class 
corresponds to Calcite logical
@@ -30,16 +33,22 @@ final class LogicalWatermarkAssigner(
     cluster: RelOptCluster,
     traits: RelTraitSet,
     input: RelNode,
+    hints: util.List[RelHint],
     rowtimeFieldIndex: Int,
     watermarkExpr: RexNode)
-  extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, 
watermarkExpr) {
+  extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, 
watermarkExpr) {
 
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
+      hints: util.List[RelHint],
       rowtime: Int,
       watermark: RexNode): RelNode = {
-    new LogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark)
+    new LogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, 
watermark)
+  }
+
+  override def withHints(hintList: util.List[RelHint]): RelNode = {
+    new LogicalWatermarkAssigner(cluster, traits, input, hintList, 
rowtimeFieldIndex, watermarkExpr)
   }
 }
 
@@ -48,9 +57,10 @@ object LogicalWatermarkAssigner {
   def create(
       cluster: RelOptCluster,
       input: RelNode,
+      hints: util.List[RelHint],
       rowtimeFieldIndex: Int,
       watermarkExpr: RexNode): LogicalWatermarkAssigner = {
     val traits = cluster.traitSetOf(Convention.NONE)
-    new LogicalWatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, 
watermarkExpr)
+    new LogicalWatermarkAssigner(cluster, traits, input, hints, 
rowtimeFieldIndex, watermarkExpr)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
index e4baf13567a..58137c195ad 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala
@@ -19,13 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.calcite
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 
+import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rel.hint.{Hintable, RelHint}
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName
 
 import java.util
+import java.util.ArrayList
 
 import scala.collection.JavaConversions._
 
@@ -34,9 +37,11 @@ abstract class WatermarkAssigner(
     cluster: RelOptCluster,
     traits: RelTraitSet,
     inputRel: RelNode,
+    val hints: util.List[RelHint],
     val rowtimeFieldIndex: Int,
     val watermarkExpr: RexNode)
-  extends SingleRel(cluster, traits, inputRel) {
+  extends SingleRel(cluster, traits, inputRel)
+  with Hintable {
 
   override def deriveRowType(): RelDataType = {
     val inputRowType = inputRel.getRowType
@@ -68,10 +73,21 @@ abstract class WatermarkAssigner(
   }
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
+    copy(traitSet, inputs.get(0), hints, rowtimeFieldIndex, watermarkExpr)
   }
 
   /** Copies a new WatermarkAssigner. */
-  def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: 
RexNode): RelNode
+  def copy(
+      traitSet: RelTraitSet,
+      input: RelNode,
+      hints: util.List[RelHint],
+      rowtime: Int,
+      watermark: RexNode): RelNode
 
+  override def getHints: ImmutableList[RelHint] = {
+    val arrayHints = hints.toArray(new Array[RelHint](0))
+    ImmutableList.copyOf(arrayHints)
+  }
+
+  def withHints(hintList: util.List[RelHint]): RelNode
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
index 795304a9f38..7468c83142f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala
@@ -24,8 +24,12 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.convert.ConverterRule.Config
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rex.RexNode
 
+import java.util
+import java.util.Collections
+
 /**
  * Sub-class of [[WatermarkAssigner]] that is a relational operator which 
generates
  * [[org.apache.flink.streaming.api.watermark.Watermark]].
@@ -34,20 +38,31 @@ class FlinkLogicalWatermarkAssigner(
     cluster: RelOptCluster,
     traits: RelTraitSet,
     input: RelNode,
+    hints: util.List[RelHint],
     rowtimeFieldIndex: Int,
     watermarkExpr: RexNode)
-  extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, 
watermarkExpr)
+  extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, 
watermarkExpr)
   with FlinkLogicalRel {
 
   /** Copies a new WatermarkAssigner. */
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
+      hints: util.List[RelHint],
       rowtime: Int,
       watermark: RexNode): RelNode = {
-    new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtime, 
watermark)
+    new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, hints, 
rowtime, watermark)
   }
 
+  override def withHints(hintList: util.List[RelHint]): RelNode = {
+    new FlinkLogicalWatermarkAssigner(
+      cluster,
+      traitSet,
+      input,
+      hints,
+      rowtimeFieldIndex,
+      watermarkExpr)
+  }
 }
 
 class FlinkLogicalWatermarkAssignerConverter(config: Config) extends 
ConverterRule(config) {
@@ -76,6 +91,12 @@ object FlinkLogicalWatermarkAssigner {
       watermarkExpr: RexNode): FlinkLogicalWatermarkAssigner = {
     val cluster = input.getCluster
     val traitSet = 
cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
-    new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, 
rowtimeFieldIndex, watermarkExpr)
+    new FlinkLogicalWatermarkAssigner(
+      cluster,
+      traitSet,
+      input,
+      Collections.emptyList(),
+      rowtimeFieldIndex,
+      watermarkExpr)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
index 69973a8e6a3..f9c7ebb6861 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala
@@ -26,8 +26,11 @@ import 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rex.RexNode
 
+import java.util
+
 import scala.collection.JavaConversions._
 
 /** Stream physical RelNode for [[WatermarkAssigner]]. */
@@ -35,9 +38,10 @@ class StreamPhysicalWatermarkAssigner(
     cluster: RelOptCluster,
     traits: RelTraitSet,
     inputRel: RelNode,
+    hints: util.List[RelHint],
     rowtimeFieldIndex: Int,
     watermarkExpr: RexNode)
-  extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex, 
watermarkExpr)
+  extends WatermarkAssigner(cluster, traits, inputRel, hints, 
rowtimeFieldIndex, watermarkExpr)
   with StreamPhysicalRel {
 
   override def requireWatermark: Boolean = false
@@ -45,9 +49,10 @@ class StreamPhysicalWatermarkAssigner(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
+      hints: util.List[RelHint],
       rowtime: Int,
       watermark: RexNode): RelNode = {
-    new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, rowtime, 
watermark)
+    new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, hints, 
rowtime, watermark)
   }
 
   /** Fully override this method to have a better display name of this 
RelNode. */
@@ -75,4 +80,14 @@ class StreamPhysicalWatermarkAssigner(
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
   }
+
+  override def withHints(hintList: util.List[RelHint]): RelNode = {
+    new StreamPhysicalWatermarkAssigner(
+      cluster,
+      traitSet,
+      input,
+      hints,
+      rowtimeFieldIndex,
+      watermarkExpr)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
index c6b62b0d09e..ba2e10f99d4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.convert.ConverterRule.Config
 
+import java.util.Collections
+
 /** Rule that converts [[FlinkLogicalWatermarkAssigner]] to 
[[StreamPhysicalWatermarkAssigner]]. */
 class StreamPhysicalWatermarkAssignerRule(config: Config) extends 
ConverterRule(config) {
 
@@ -39,6 +41,7 @@ class StreamPhysicalWatermarkAssignerRule(config: Config) 
extends ConverterRule(
       watermarkAssigner.getCluster,
       traitSet,
       convertInput,
+      Collections.emptyList(),
       watermarkAssigner.rowtimeFieldIndex,
       watermarkAssigner.watermarkExpr
     )
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
index 674fdc602ba..d23f440b51b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
@@ -66,6 +66,31 @@ class StateTtlHintTest extends TableTestBase {
                                 + " 'connector' = 'values'\n"
                                 + ")");
 
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE tableWithWatermark1 (\n"
+                                + "  a INT,\n"
+                                + "  b BIGINT,\n"
+                                + "  c TIMESTAMP(3),"
+                                + "  WATERMARK FOR c AS c"
+                                + ") WITH ("
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'false'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE tableWithWatermark2 ("
+                                + "  a int,\n"
+                                + "  b BIGINT,\n"
+                                + "  c ROW<c1 TIMESTAMP(3)>,\n"
+                                + "  d AS c.c1 + INTERVAL '5' SECOND,\n"
+                                + "  WATERMARK FOR d as d - INTERVAL '5' 
second"
+                                + ") WITH ("
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'false'\n"
+                                + ")");
+
         util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as 
b4 from T3");
 
         util.tableEnv()
@@ -297,6 +322,26 @@ class StateTtlHintTest extends TableTestBase {
                         "Invalid STATE_TTL hint, expecting at least one 
key-value options specified.");
     }
 
+    @Test
+    void testWatermarkAssigner() {
+        String sql =
+                "\n"
+                        + "SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 
'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1\n"
+                        + "LEFT JOIN(SELECT DISTINCT b FROM 
tableWithWatermark2) tww2\n"
+                        + "ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS 
NOT NULL";
+        verify(sql);
+    }
+
+    @Test
+    void testWatermarkAssignerWithAliases() {
+        String sql =
+                "\n"
+                        + "SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ 
tww1.* FROM tableWithWatermark1 tww1\n"
+                        + "LEFT JOIN(SELECT DISTINCT b FROM 
tableWithWatermark2) tww2\n"
+                        + "ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL";
+        verify(sql);
+    }
+
     private void verify(String sql) {
         util.doVerifyPlan(
                 sql,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
index b395e6002b2..8b1a70096f7 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
@@ -494,4 +494,82 @@ GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS 
EXPR$1])
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testWatermarkAssigner">
+               <Resource name="sql">
+                       <![CDATA[
+SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */ 
tableWithWatermark1.* FROM tableWithWatermark1
+LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2
+ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IS NOT NULL($3)])
+   +- LogicalJoin(condition=[=($1, $3)], joinType=[left], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, 
tableWithWatermark1=1d}]]])
+      :- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS 
options:[tableWithWatermark1]]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithWatermark1]])
+      +- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]])
+         +- LogicalProject(b=[$1])
+            +- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 
5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 
5000:INTERVAL SECOND)])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, tableWithWatermark2]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Calc(select=[a, b, c])
++- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
+   :- Exchange(distribution=[hash[b]])
+   :  +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT 
NULL(b)])
+   :     +- WatermarkAssigner(rowtime=[c], watermark=[c])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithWatermark1]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[b]])
+      +- GroupAggregate(groupBy=[b], select=[b])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b], where=[IS NOT NULL(b)])
+               +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL 
SECOND)])
+                  +- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, tableWithWatermark2, project=[b, c], metadata=[]]], 
fields=[b, c])
+]]>
+               </Resource>
+       </TestCase>
+  <TestCase name="testWatermarkAssignerWithAliases">
+               <Resource name="sql">
+                       <![CDATA[
+SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM 
tableWithWatermark1 tww1
+LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2
+ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IS NOT NULL($3)])
+   +- LogicalJoin(condition=[=($1, $3)], joinType=[left], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, tww1=1d}]]])
+      :- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS 
options:[tww1]]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, 
tableWithWatermark1]])
+      +- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]])
+         +- LogicalProject(b=[$1])
+            +- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 
5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 
5000:INTERVAL SECOND)])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, tableWithWatermark2]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Calc(select=[a, b, c])
++- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]])
+   :- Exchange(distribution=[hash[b]])
+   :  +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT 
NULL(b)])
+   :     +- WatermarkAssigner(rowtime=[c], watermark=[c])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
tableWithWatermark1]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[b]])
+      +- GroupAggregate(groupBy=[b], select=[b])
+         +- Exchange(distribution=[hash[b]])
+            +- Calc(select=[b], where=[IS NOT NULL(b)])
+               +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL 
SECOND)])
+                  +- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, tableWithWatermark2, project=[b, c], metadata=[]]], 
fields=[b, c])
+]]>
+               </Resource>
+       </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
index 261f438fb83..a584fbbe866 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml
@@ -181,8 +181,8 @@ LogicalProject(a=[$0])
    +- LogicalProject(a=[$0], b=[$1], c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
UdfTable]])
 ]]>
-    </Resource>
-  </TestCase>
+               </Resource>
+       </TestCase>
   <TestCase name="transposeWithIncludeComputedRowTime">
     <Resource name="sql">
       <![CDATA[SELECT a, b, d FROM VirtualTable]]>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 832ab36255a..ba57c2bca3a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1321,6 +1321,7 @@ case class StreamTableTestUtil(
       sourceRel.getCluster,
       sourceRel.getTraitSet,
       sourceRel,
+      Collections.emptyList(),
       rowtimeFieldIdx,
       expr
     )

Reply via email to