This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4101316ef37 [FLINK-36417][table] Add support for hints in WatermarkAssigner 4101316ef37 is described below commit 4101316ef37d3a7082bff57cb732f4e9e0349d09 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Thu Oct 3 21:32:16 2024 +0200 [FLINK-36417][table] Add support for hints in WatermarkAssigner --- .../table/planner/calcite/FlinkRelBuilder.java | 3 +- .../flink/table/planner/hint/FlinkHints.java | 4 +- .../logical/EventTimeTemporalJoinRewriteRule.java | 6 +- ...arkAssignerChangelogNormalizeTransposeRule.java | 1 + .../nodes/calcite/LogicalWatermarkAssigner.scala | 16 ++++- .../plan/nodes/calcite/WatermarkAssigner.scala | 22 +++++- .../logical/FlinkLogicalWatermarkAssigner.scala | 27 +++++++- .../stream/StreamPhysicalWatermarkAssigner.scala | 19 +++++- .../StreamPhysicalWatermarkAssignerRule.scala | 3 + .../plan/hints/stream/StateTtlHintTest.java | 45 +++++++++++++ .../planner/plan/hints/stream/StateTtlHintTest.xml | 78 ++++++++++++++++++++++ .../ProjectWatermarkAssignerTransposeRuleTest.xml | 4 +- .../flink/table/planner/utils/TableTestBase.scala | 1 + 13 files changed, 213 insertions(+), 16 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java index 71129e26d65..60a0c897eee 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java @@ -253,7 +253,8 @@ public final class FlinkRelBuilder extends RelBuilder { public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) { final RelNode input = build(); final RelNode relNode = - LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr); + LogicalWatermarkAssigner.create( + cluster, input, Collections.emptyList(), rowtimeFieldIndex, watermarkExpr); return push(relNode); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java index 18f13d75472..bcbc1dca533 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.hint; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner; import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule; import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; @@ -119,7 +120,8 @@ public abstract class FlinkHints { public static boolean canTransposeToTableScan(RelNode node) { return node instanceof LogicalProject // computed column on table || node instanceof LogicalFilter - || node instanceof LogicalSnapshot; + || node instanceof LogicalSnapshot + || node instanceof WatermarkAssigner; } /** Returns the qualified name of a table scan, otherwise returns empty. */ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java index 89feca8a2b6..64df95c979e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/EventTimeTemporalJoinRewriteRule.java @@ -140,7 +140,11 @@ public class EventTimeTemporalJoinRewriteRule final RelNode newChild = transmitSnapshotRequirement(child); if (newChild != child) { return wma.copy( - wma.getTraitSet(), newChild, wma.rowtimeFieldIndex(), wma.watermarkExpr()); + wma.getTraitSet(), + newChild, + wma.getHints(), + wma.rowtimeFieldIndex(), + wma.watermarkExpr()); } return wma; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java index 15d8f485994..51963d79eb3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java @@ -288,6 +288,7 @@ public class WatermarkAssignerChangelogNormalizeTransposeRule watermark.copy( watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()), exchange.getInput(), + Collections.emptyList(), newRowTimeFieldIndex, newWatermarkExpr); final RelNode newChangelogNormalize = diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala index 89a9cb7e3c4..c9d3443f526 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWatermarkAssigner.scala @@ -19,8 +19,11 @@ package org.apache.flink.table.planner.plan.nodes.calcite import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode +import java.util + /** * Sub-class of [[WatermarkAssigner]] that is a relational operator which generates * [[org.apache.flink.streaming.api.watermark.Watermark]]. This class corresponds to Calcite logical @@ -30,16 +33,22 @@ final class LogicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode) - extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) { + extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) { override def copy( traitSet: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtime: Int, watermark: RexNode): RelNode = { - new LogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark) + new LogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark) + } + + override def withHints(hintList: util.List[RelHint]): RelNode = { + new LogicalWatermarkAssigner(cluster, traits, input, hintList, rowtimeFieldIndex, watermarkExpr) } } @@ -48,9 +57,10 @@ object LogicalWatermarkAssigner { def create( cluster: RelOptCluster, input: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode): LogicalWatermarkAssigner = { val traits = cluster.traitSetOf(Convention.NONE) - new LogicalWatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) + new LogicalWatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala index e4baf13567a..58137c195ad 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WatermarkAssigner.scala @@ -19,13 +19,16 @@ package org.apache.flink.table.planner.plan.nodes.calcite import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import com.google.common.collect.ImmutableList import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rel.hint.{Hintable, RelHint} import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName import java.util +import java.util.ArrayList import scala.collection.JavaConversions._ @@ -34,9 +37,11 @@ abstract class WatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, inputRel: RelNode, + val hints: util.List[RelHint], val rowtimeFieldIndex: Int, val watermarkExpr: RexNode) - extends SingleRel(cluster, traits, inputRel) { + extends SingleRel(cluster, traits, inputRel) + with Hintable { override def deriveRowType(): RelDataType = { val inputRowType = inputRel.getRowType @@ -68,10 +73,21 @@ abstract class WatermarkAssigner( } override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr) + copy(traitSet, inputs.get(0), hints, rowtimeFieldIndex, watermarkExpr) } /** Copies a new WatermarkAssigner. */ - def copy(traitSet: RelTraitSet, input: RelNode, rowtime: Int, watermark: RexNode): RelNode + def copy( + traitSet: RelTraitSet, + input: RelNode, + hints: util.List[RelHint], + rowtime: Int, + watermark: RexNode): RelNode + override def getHints: ImmutableList[RelHint] = { + val arrayHints = hints.toArray(new Array[RelHint](0)) + ImmutableList.copyOf(arrayHints) + } + + def withHints(hintList: util.List[RelHint]): RelNode } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala index 795304a9f38..7468c83142f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala @@ -24,8 +24,12 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule.Config +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode +import java.util +import java.util.Collections + /** * Sub-class of [[WatermarkAssigner]] that is a relational operator which generates * [[org.apache.flink.streaming.api.watermark.Watermark]]. @@ -34,20 +38,31 @@ class FlinkLogicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode) - extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkExpr) + extends WatermarkAssigner(cluster, traits, input, hints, rowtimeFieldIndex, watermarkExpr) with FlinkLogicalRel { /** Copies a new WatermarkAssigner. */ override def copy( traitSet: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtime: Int, watermark: RexNode): RelNode = { - new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark) + new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark) } + override def withHints(hintList: util.List[RelHint]): RelNode = { + new FlinkLogicalWatermarkAssigner( + cluster, + traitSet, + input, + hints, + rowtimeFieldIndex, + watermarkExpr) + } } class FlinkLogicalWatermarkAssignerConverter(config: Config) extends ConverterRule(config) { @@ -76,6 +91,12 @@ object FlinkLogicalWatermarkAssigner { watermarkExpr: RexNode): FlinkLogicalWatermarkAssigner = { val cluster = input.getCluster val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() - new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeFieldIndex, watermarkExpr) + new FlinkLogicalWatermarkAssigner( + cluster, + traitSet, + input, + Collections.emptyList(), + rowtimeFieldIndex, + watermarkExpr) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala index 69973a8e6a3..f9c7ebb6861 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWatermarkAssigner.scala @@ -26,8 +26,11 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.{RelNode, RelWriter} +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rex.RexNode +import java.util + import scala.collection.JavaConversions._ /** Stream physical RelNode for [[WatermarkAssigner]]. */ @@ -35,9 +38,10 @@ class StreamPhysicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, inputRel: RelNode, + hints: util.List[RelHint], rowtimeFieldIndex: Int, watermarkExpr: RexNode) - extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex, watermarkExpr) + extends WatermarkAssigner(cluster, traits, inputRel, hints, rowtimeFieldIndex, watermarkExpr) with StreamPhysicalRel { override def requireWatermark: Boolean = false @@ -45,9 +49,10 @@ class StreamPhysicalWatermarkAssigner( override def copy( traitSet: RelTraitSet, input: RelNode, + hints: util.List[RelHint], rowtime: Int, watermark: RexNode): RelNode = { - new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, rowtime, watermark) + new StreamPhysicalWatermarkAssigner(cluster, traitSet, input, hints, rowtime, watermark) } /** Fully override this method to have a better display name of this RelNode. */ @@ -75,4 +80,14 @@ class StreamPhysicalWatermarkAssigner( FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) } + + override def withHints(hintList: util.List[RelHint]): RelNode = { + new StreamPhysicalWatermarkAssigner( + cluster, + traitSet, + input, + hints, + rowtimeFieldIndex, + watermarkExpr) + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala index c6b62b0d09e..ba2e10f99d4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWatermarkAssignerRule.scala @@ -26,6 +26,8 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule.Config +import java.util.Collections + /** Rule that converts [[FlinkLogicalWatermarkAssigner]] to [[StreamPhysicalWatermarkAssigner]]. */ class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule(config) { @@ -39,6 +41,7 @@ class StreamPhysicalWatermarkAssignerRule(config: Config) extends ConverterRule( watermarkAssigner.getCluster, traitSet, convertInput, + Collections.emptyList(), watermarkAssigner.rowtimeFieldIndex, watermarkAssigner.watermarkExpr ) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java index 674fdc602ba..d23f440b51b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java @@ -66,6 +66,31 @@ class StateTtlHintTest extends TableTestBase { + " 'connector' = 'values'\n" + ")"); + util.tableEnv() + .executeSql( + "CREATE TABLE tableWithWatermark1 (\n" + + " a INT,\n" + + " b BIGINT,\n" + + " c TIMESTAMP(3)," + + " WATERMARK FOR c AS c" + + ") WITH (" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false'\n" + + ")"); + + util.tableEnv() + .executeSql( + "CREATE TABLE tableWithWatermark2 (" + + " a int,\n" + + " b BIGINT,\n" + + " c ROW<c1 TIMESTAMP(3)>,\n" + + " d AS c.c1 + INTERVAL '5' SECOND,\n" + + " WATERMARK FOR d as d - INTERVAL '5' second" + + ") WITH (" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false'\n" + + ")"); + util.tableEnv().executeSql("CREATE View V4 as select a3 as a4, b3 as b4 from T3"); util.tableEnv() @@ -297,6 +322,26 @@ class StateTtlHintTest extends TableTestBase { "Invalid STATE_TTL hint, expecting at least one key-value options specified."); } + @Test + void testWatermarkAssigner() { + String sql = + "\n" + + "SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1\n" + + "LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2\n" + + "ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL"; + verify(sql); + } + + @Test + void testWatermarkAssignerWithAliases() { + String sql = + "\n" + + "SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM tableWithWatermark1 tww1\n" + + "LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2\n" + + "ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL"; + verify(sql); + } + private void verify(String sql) { util.doVerifyPlan( sql, diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml index b395e6002b2..8b1a70096f7 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml @@ -494,4 +494,82 @@ GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS EXPR$1]) ]]> </Resource> </TestCase> + <TestCase name="testWatermarkAssigner"> + <Resource name="sql"> + <![CDATA[ +SELECT /*+ STATE_TTL('tableWithWatermark1'='1d', 'tww2' = '3d') */ tableWithWatermark1.* FROM tableWithWatermark1 +LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2 +ON tableWithWatermark1.b = tww2.b WHERE tww2.b IS NOT NULL]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[IS NOT NULL($3)]) + +- LogicalJoin(condition=[=($1, $3)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, tableWithWatermark1=1d}]]]) + :- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS options:[tableWithWatermark1]]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark1]]) + +- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]]) + +- LogicalProject(b=[$1]) + +- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c]) ++- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]]) + :- Exchange(distribution=[hash[b]]) + : +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT NULL(b)]) + : +- WatermarkAssigner(rowtime=[c], watermark=[c]) + : +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark1]], fields=[a, b, c]) + +- Exchange(distribution=[hash[b]]) + +- GroupAggregate(groupBy=[b], select=[b]) + +- Exchange(distribution=[hash[b]]) + +- Calc(select=[b], where=[IS NOT NULL(b)]) + +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL SECOND)]) + +- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d]) + +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark2, project=[b, c], metadata=[]]], fields=[b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testWatermarkAssignerWithAliases"> + <Resource name="sql"> + <![CDATA[ +SELECT /*+ STATE_TTL('tww1'='1d', 'tww2' = '3d') */ tww1.* FROM tableWithWatermark1 tww1 +LEFT JOIN(SELECT DISTINCT b FROM tableWithWatermark2) tww2 +ON tww1.b = tww2.b WHERE tww2.b IS NOT NULL]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalFilter(condition=[IS NOT NULL($3)]) + +- LogicalJoin(condition=[=($1, $3)], joinType=[left], stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{tww2=3d, tww1=1d}]]]) + :- LogicalWatermarkAssigner(rowtime=[c], watermark=[$2], hints=[[[ALIAS options:[tww1]]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark1]]) + +- LogicalAggregate(group=[{0}], hints=[[[ALIAS options:[tww2]]]]) + +- LogicalProject(b=[$1]) + +- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)], hints=[[[ALIAS options:[tableWithWatermark2]]]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2.c1, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, tableWithWatermark2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c]) ++- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=3d}]]]) + :- Exchange(distribution=[hash[b]]) + : +- Calc(select=[a, b, CAST(c AS TIMESTAMP(3)) AS c], where=[IS NOT NULL(b)]) + : +- WatermarkAssigner(rowtime=[c], watermark=[c]) + : +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark1]], fields=[a, b, c]) + +- Exchange(distribution=[hash[b]]) + +- GroupAggregate(groupBy=[b], select=[b]) + +- Exchange(distribution=[hash[b]]) + +- Calc(select=[b], where=[IS NOT NULL(b)]) + +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL SECOND)]) + +- Calc(select=[b, +(c.c1, 5000:INTERVAL SECOND) AS d]) + +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark2, project=[b, c], metadata=[]]], fields=[b, c]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml index 261f438fb83..a584fbbe866 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRuleTest.xml @@ -181,8 +181,8 @@ LogicalProject(a=[$0]) +- LogicalProject(a=[$0], b=[$1], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, UdfTable]]) ]]> - </Resource> - </TestCase> + </Resource> + </TestCase> <TestCase name="transposeWithIncludeComputedRowTime"> <Resource name="sql"> <![CDATA[SELECT a, b, d FROM VirtualTable]]> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 832ab36255a..ba57c2bca3a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1321,6 +1321,7 @@ case class StreamTableTestUtil( sourceRel.getCluster, sourceRel.getTraitSet, sourceRel, + Collections.emptyList(), rowtimeFieldIdx, expr )