This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 71f23066562 [FLINK-31424][table-planner] Fix NPE produced by multiple 
sink with local-global window aggregate
71f23066562 is described below

commit 71f23066562018910e5599719d2a1bc0eb17f430
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Thu Mar 23 12:04:42 2023 +0800

    [FLINK-31424][table-planner] Fix NPE produced by multiple sink with 
local-global window aggregate
    
    This closes #22238
---
 .../plan/metadata/FlinkRelMdWindowProperties.scala |  40 ++-
 .../table/planner/plan/stats/FlinkStatistic.scala  |   6 +-
 .../plan/stream/sql/join/WindowJoinTest.xml        |  72 +++++
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 318 ++++++++++++++++++++-
 .../metadata/FlinkRelMdWindowPropertiesTest.scala  | 184 ++++++++++++
 .../planner/plan/metadata/MetadataTestUtil.scala   |  36 +++
 .../plan/stream/sql/join/WindowJoinTest.scala      |  60 ++++
 7 files changed, 708 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
index 30087585974..a688302e914 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.planner.{JArrayList, JHashMap, JList}
 import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.WindowSpec
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, 
WatermarkAssigner}
 import 
org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, 
FlinkLogicalCorrelate, FlinkLogicalJoin, FlinkLogicalRank}
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
@@ -27,6 +28,7 @@ import 
org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
 import 
org.apache.flink.table.planner.plan.utils.WindowJoinUtil.satisfyWindowJoin
 import 
org.apache.flink.table.planner.plan.utils.WindowUtil.{convertToWindowingStrategy,
 groupingContainsWindowStartEnd, isWindowTableFunctionCall}
 import org.apache.flink.table.runtime.groupwindow._
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.plan.volcano.RelSubset
@@ -254,11 +256,34 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
   def getWindowProperties(
       rel: StreamPhysicalWindowAggregate,
       mq: RelMetadataQuery): RelWindowProperties = {
+    getWindowAggregateWindowProperties(
+      rel.grouping.length + rel.aggCalls.size(),
+      rel.namedWindowProperties,
+      rel.windowing.getWindow,
+      rel.windowing.getTimeAttributeType
+    )
+  }
+
+  def getWindowProperties(
+      rel: StreamPhysicalGlobalWindowAggregate,
+      mq: RelMetadataQuery): RelWindowProperties = {
+    getWindowAggregateWindowProperties(
+      rel.grouping.length + rel.aggCalls.size(),
+      rel.namedWindowProperties,
+      rel.windowing.getWindow,
+      rel.windowing.getTimeAttributeType
+    )
+  }
+
+  private def getWindowAggregateWindowProperties(
+      propertyOffset: Int,
+      windowProperties: Seq[NamedWindowProperty],
+      windowSpec: WindowSpec,
+      timeAttributeType: LogicalType): RelWindowProperties = {
     val starts = ArrayBuffer[Int]()
     val ends = ArrayBuffer[Int]()
     val times = ArrayBuffer[Int]()
-    val propertyOffset = rel.grouping.length + rel.aggCalls.size()
-    rel.namedWindowProperties.map(_.getProperty).zipWithIndex.foreach {
+    windowProperties.map(_.getProperty).zipWithIndex.foreach {
       case (p, index) =>
         p match {
           case _: WindowStart =>
@@ -275,11 +300,18 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
       ImmutableBitSet.of(starts: _*),
       ImmutableBitSet.of(ends: _*),
       ImmutableBitSet.of(times: _*),
-      rel.windowing.getWindow,
-      rel.windowing.getTimeAttributeType
+      windowSpec,
+      timeAttributeType
     )
   }
 
+  def getWindowProperties(
+      rel: StreamPhysicalLocalWindowAggregate,
+      mq: RelMetadataQuery): RelWindowProperties = {
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    fmq.getRelWindowProperties(rel.getInput)
+  }
+
   def getWindowProperties(
       rel: StreamPhysicalWindowRank,
       mq: RelMetadataQuery): RelWindowProperties = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
index 4d5b4ecf2f7..7158a8039c9 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.stats
 
-import org.apache.flink.table.catalog.{ContextResolvedTable, ResolvedSchema, 
UniqueConstraint}
+import org.apache.flink.table.catalog.{ResolvedSchema, UniqueConstraint}
 import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
 import org.apache.flink.table.planner.plan.`trait`.{RelModifiedMonotonicity, 
RelWindowProperties}
 
@@ -29,7 +29,6 @@ import org.apache.calcite.util.ImmutableBitSet
 import javax.annotation.Nullable
 
 import java.util
-import java.util.{HashSet, Optional, Set}
 
 import scala.collection.JavaConversions._
 
@@ -206,6 +205,7 @@ object FlinkStatistic {
       this.tableStats = statistic.getTableStats
       this.uniqueKeys = statistic.getUniqueKeys
       this.relModifiedMonotonicity = statistic.getRelModifiedMonotonicity
+      this.windowProperties = statistic.getRelWindowProperties
       this
     }
 
@@ -218,7 +218,7 @@ object FlinkStatistic {
       ) {
         UNKNOWN
       } else {
-        new FlinkStatistic(tableStats, uniqueKeys, relModifiedMonotonicity)
+        new FlinkStatistic(tableStats, uniqueKeys, relModifiedMonotonicity, 
windowProperties)
       }
     }
   }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index 39a7abd3fd2..f94791d3b5a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -154,6 +154,78 @@ WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], si
             +- LocalWindowAggregate(groupBy=[a], 
window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, slice_end('w$) 
AS $slice_end])
                +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
                   +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, 
c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinToMultiSink">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink1], 
fields=[window_start, window_end, user_id, dt, hour])
++- LogicalProject(window_start=[$1], window_end=[$2], user_id=[$0], 
dt=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'yyyyMMdd')], 
hour=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'HH')])
+   +- LogicalJoin(condition=[AND(=($0, $3), =($1, $4), =($2, $5))], 
joinType=[left])
+      :- LogicalAggregate(group=[{0, 1, 2}])
+      :  +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+      :     +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3), 
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME* 
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
+      :        +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2], 
event_time=[$3])
+      :           +- LogicalWatermarkAssigner(rowtime=[event_time], 
watermark=[$3])
+      :              +- LogicalTableScan(table=[[default_catalog, 
default_database, food_order]])
+      +- LogicalAggregate(group=[{0, 1, 2}])
+         +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+            +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3), 
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME* 
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
+               +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2], 
event_time=[$3])
+                  +- LogicalWatermarkAssigner(rowtime=[event_time], 
watermark=[$3])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, food_order]])
+
+LogicalSink(table=[default_catalog.default_database.sink2], 
fields=[window_start, window_end, user_id, dt, hour])
++- LogicalProject(window_start=[$1], window_end=[$2], user_id=[$0], 
dt=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'yyyyMMdd')], 
hour=[DATE_FORMAT(+($2, 25200000:INTERVAL HOUR), _UTF-16LE'HH')])
+   +- LogicalJoin(condition=[AND(=($0, $3), =($1, $4), =($2, $5))], 
joinType=[left])
+      :- LogicalAggregate(group=[{0, 1, 2}])
+      :  +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+      :     +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3), 
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME* 
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
+      :        +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2], 
event_time=[$3])
+      :           +- LogicalWatermarkAssigner(rowtime=[event_time], 
watermark=[$3])
+      :              +- LogicalTableScan(table=[[default_catalog, 
default_database, food_order]])
+      +- LogicalAggregate(group=[{0, 1, 2}])
+         +- LogicalProject(user_id=[$0], window_start=[$4], window_end=[$5])
+            +- LogicalTableFunctionScan(invocation=[TUMBLE($3, DESCRIPTOR($3), 
60000:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) order_id, INTEGER amount, TIMESTAMP(3) *ROWTIME* 
event_time, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) 
*ROWTIME* window_time)])
+               +- LogicalProject(user_id=[$0], order_id=[$1], amount=[$2], 
event_time=[$3])
+                  +- LogicalWatermarkAssigner(rowtime=[event_time], 
watermark=[$3])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, food_order]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[window_start, 
window_end, user_id, dt, hour])
++- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end, 
25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end, 
25200000:INTERVAL HOUR), 'HH') AS hour])
+   +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 
min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)], 
select=[user_id, window_start, window_end, user_id0, window_start0, 
window_end0])
+      :- Exchange(distribution=[hash[user_id]])
+      :  +- GlobalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, 
start('w$) AS window_start, end('w$) AS window_end])
+      :     +- Exchange(distribution=[hash[user_id]])
+      :        +- LocalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, 
slice_end('w$) AS $slice_end])
+      :           +- WatermarkAssigner(rowtime=[event_time], 
watermark=[event_time])
+      :              +- TableSourceScan(table=[[default_catalog, 
default_database, food_order, project=[user_id, event_time], metadata=[]]], 
fields=[user_id, event_time])
+      +- Exchange(distribution=[hash[user_id]])
+         +- GlobalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, 
start('w$) AS window_start, end('w$) AS window_end])
+            +- Exchange(distribution=[hash[user_id]])
+               +- LocalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, 
slice_end('w$) AS $slice_end])
+                  +- WatermarkAssigner(rowtime=[event_time], 
watermark=[event_time])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, food_order, project=[user_id, event_time], metadata=[]]], 
fields=[user_id, event_time])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[window_start, 
window_end, user_id, dt, hour])
++- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end, 
25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end, 
25200000:INTERVAL HOUR), 'HH') AS hour])
+   +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 
min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)], 
select=[user_id, window_start, window_end, user_id0, window_start0, 
window_end0])
+      :- Exchange(distribution=[hash[user_id]])
+      :  +- GlobalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, 
start('w$) AS window_start, end('w$) AS window_end])
+      :     +- Exchange(distribution=[hash[user_id]])
+      :        +- LocalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, 
slice_end('w$) AS $slice_end])
+      :           +- WatermarkAssigner(rowtime=[event_time], 
watermark=[event_time])
+      :              +- TableSourceScan(table=[[default_catalog, 
default_database, food_order, project=[user_id, event_time], metadata=[]]], 
fields=[user_id, event_time])
+      +- Exchange(distribution=[hash[user_id]])
+         +- GlobalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, 
start('w$) AS window_start, end('w$) AS window_end])
+            +- Exchange(distribution=[hash[user_id]])
+               +- LocalWindowAggregate(groupBy=[user_id], 
window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, 
slice_end('w$) AS $slice_end])
+                  +- WatermarkAssigner(rowtime=[event_time], 
watermark=[event_time])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, food_order, project=[user_id, event_time], metadata=[]]], 
fields=[user_id, event_time])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 5535fc0671e..48d2dc204e0 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.catalog.ContextResolvedFunction
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
@@ -25,6 +26,7 @@ import org.apache.flink.table.functions.{FunctionIdentifier, 
UserDefinedFunction
 import org.apache.flink.table.operations.TableSourceQueryOperation
 import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, 
FlinkTypeFactory}
 import org.apache.flink.table.planner.delegation.PlannerContext
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.functions.utils.AggSqlFunction
 import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef}
@@ -39,6 +41,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream._
 import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
IntermediateRelTable, TableSourceTable}
 import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable
 import org.apache.flink.table.planner.plan.utils._
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions
 import org.apache.flink.table.planner.utils.{PlannerMocks, Top3}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
 import org.apache.flink.table.runtime.groupwindow._
@@ -48,6 +51,7 @@ import org.apache.flink.table.types.logical._
 import org.apache.flink.table.types.utils.TypeConversions
 
 import com.google.common.collect.{ImmutableList, Lists}
+import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
 import org.apache.calcite.prepare.CalciteCatalogReader
@@ -60,7 +64,7 @@ import 
org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQu
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.{SqlAggFunction, SqlWindow}
+import org.apache.calcite.sql.{SqlAggFunction, SqlIntervalQualifier, SqlWindow}
 import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.calcite.sql.parser.SqlParserPos
@@ -197,6 +201,13 @@ class FlinkRelMdHandlerTestBase {
   protected lazy val tableSourceTableNonKeyStreamScan: 
StreamPhysicalDataStreamScan =
     createTableSourceTable(ImmutableList.of("TableSourceTable3"), 
streamPhysicalTraits)
 
+  protected lazy val temporalTableLogicalScan: LogicalTableScan =
+    createDataStreamScan(ImmutableList.of("TemporalTable4"), logicalTraits)
+  protected lazy val temporalTableFlinkLogicalScan: 
FlinkLogicalDataStreamTableScan =
+    createDataStreamScan(ImmutableList.of("TemporalTable4"), 
flinkLogicalTraits)
+  protected lazy val temporalTableStreamScan: StreamPhysicalDataStreamScan =
+    createDataStreamScan(ImmutableList.of("TemporalTable4"), 
streamPhysicalTraits)
+
   private lazy val valuesType = relBuilder.getTypeFactory
     .builder()
     .add("a", SqlTypeName.BIGINT)
@@ -3156,6 +3167,10 @@ class FlinkRelMdHandlerTestBase {
   protected lazy val batchCumulateWindowTVFRel = createWindowTVFRel(false, 
cumulateWindowSpec)
   protected lazy val streamCumulateWindowTVFRel = createWindowTVFRel(true, 
cumulateWindowSpec)
 
+  protected lazy val timeAttributeType = new TimestampType(true, 
TimestampKind.ROWTIME, 3)
+  protected lazy val proctimeType = new LocalZonedTimestampType(true, 
TimestampKind.PROCTIME, 3)
+  protected lazy val windowStartEndType = new TimestampType(false, 3)
+
   protected def createWindowTVFRel(
       isStreamingMode: Boolean,
       windowSpec: WindowSpec): CommonPhysicalWindowTableFunction = {
@@ -3195,6 +3210,307 @@ class FlinkRelMdHandlerTestBase {
     }
   }
 
+  // equivalent SQL is
+  // SELECT * FROM
+  //   TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(ptime), INTERVAL '10' MINUTE))
+  // CREATE TEMPORARY VIEW tmp AS
+  // SELECT `id`, `name`, PROCTIME() AS ptime FROM student
+  protected lazy val windowTableFunctionScan: TableFunctionScan = 
createTableFunctionScan(true)
+
+  // equivalent SQL is
+  // SELECT `name`, `val` FROM student,
+  // LATERAL TABLE(STRING_SPLIT(`name`, CAST(`id` AS STRING))) AS T(`val`);
+  protected lazy val lateralTableFunctionScan: TableFunctionScan = 
createTableFunctionScan(false)
+
+  protected def createTableFunctionScan(windowFunctionCall: Boolean): 
TableFunctionScan = {
+    relBuilder.push(studentLogicalScan)
+
+    if (windowFunctionCall) {
+      val projects = List(
+        relBuilder.field(0),
+        relBuilder.field(1),
+        relBuilder.call(FlinkSqlOperatorTable.PROCTIME))
+      val outputRowType = typeFactory.buildRelNodeRowType(
+        Array("id", "name", "ptime"),
+        Array(
+          new BigIntType,
+          new VarCharType,
+          proctimeType
+        )
+      )
+      val calcOnStudentScan =
+        createLogicalCalc(
+          studentLogicalScan,
+          outputRowType,
+          projects,
+          null
+        )
+      new FlinkLogicalTableFunctionScan(
+        cluster,
+        logicalTraits,
+        ImmutableList.of(calcOnStudentScan),
+        relBuilder.call(
+          FlinkSqlOperatorTable.TUMBLE,
+          relBuilder.field(2),
+          relBuilder.call(FlinkSqlOperatorTable.DESCRIPTOR, 
relBuilder.field(2)),
+          rexBuilder.makeIntervalLiteral(
+            bd(600000L),
+            new SqlIntervalQualifier(TimeUnit.MILLISECOND, null, 
SqlParserPos.ZERO))
+        ),
+        null,
+        typeFactory.builder
+          .kind(outputRowType.getStructKind)
+          .addAll(outputRowType.getFieldList)
+          .add("window_start", SqlTypeName.TIMESTAMP, 3)
+          .add("window_end", SqlTypeName.TIMESTAMP, 3)
+          .add("window_time", outputRowType.getFieldList.get(2).getType)
+          .build,
+        null)
+    } else {
+      val correlVar = rexBuilder.makeCorrel(
+        typeFactory.buildRelNodeRowType(
+          Array("id", "name", "val"),
+          Array(
+            new BigIntType,
+            new VarCharType,
+            new VarCharType
+          )
+        ),
+        new CorrelationId(0))
+      val tableFunctionCall = relBuilder.call(
+        BridgingSqlFunction.of(
+          relBuilder.getCluster,
+          ContextResolvedFunction.temporary(
+            FunctionIdentifier.of("STRING_SPLIT"),
+            new JavaUserDefinedTableFunctions.StringSplit())),
+        rexBuilder.makeFieldAccess(correlVar, 1),
+        rexBuilder.makeCall(stringType, CAST, 
List(rexBuilder.makeFieldAccess(correlVar, 0)))
+      )
+      new FlinkLogicalTableFunctionScan(
+        cluster,
+        logicalTraits,
+        ImmutableList.of(),
+        tableFunctionCall,
+        null,
+        typeFactory.buildRelNodeRowType(Array("EXPR$0"), Array(new 
VarCharType)),
+        null
+      )
+    }
+  }
+
+  // equivalent SQL is
+  // SELECT b, window_end AS my_window_end, window_start FROM
+  //   TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE))
+  protected lazy val keepWindowCalcOnTumbleWindowTVF: Calc =
+    createCalcOnWindowTVF(streamTumbleWindowTVFRel, true)
+
+  // equivalent SQL is
+  // SELECT b, CAST(window_end AS STRING) AS my_end, CAST(window_start AS 
STRING) AS my_start FROM
+  //   TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE))
+  protected lazy val discardWindowCalcOnTumbleWindowTVF: Calc =
+    createCalcOnWindowTVF(streamTumbleWindowTVFRel, false)
+  // equivalent SQL is
+  // SELECT b, window_end AS my_window_end, window_start FROM
+  //   TABLE(HOP (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE, INTERVAL '1' HOUR))
+  protected lazy val keepWindowCalcOnHopWindowTVF: Calc =
+    createCalcOnWindowTVF(streamHopWindowTVFRel, true)
+
+  // equivalent SQL is
+  // SELECT b, CAST(window_end AS STRING) AS my_end, CAST(window_start AS 
STRING) AS my_start FROM
+  //   TABLE(HOP (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE, INTERVAL '1' HOUR))
+  protected lazy val discardWindowCalcOnHopWindowTVF: Calc =
+    createCalcOnWindowTVF(streamHopWindowTVFRel, false)
+  // equivalent SQL is
+  // SELECT b, window_end AS my_window_end, window_start FROM
+  //   TABLE(CUMULATE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL 
'10' MINUTE, INTERVAL '1' HOUR))
+  protected lazy val keepWindowCalcOnCumulateWindowTVF: Calc =
+    createCalcOnWindowTVF(streamCumulateWindowTVFRel, true)
+
+  // equivalent SQL is
+  // SELECT b, CAST(window_end AS STRING) AS my_end, CAST(window_start AS 
STRING) AS my_start FROM
+  //   TABLE(CUMULATE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL 
'10' MINUTE, INTERVAL '1' HOUR))
+  protected lazy val discardWindowCalcOnCumulateWindowTVF: Calc =
+    createCalcOnWindowTVF(streamCumulateWindowTVFRel, false)
+  protected def createCalcOnWindowTVF(
+      tvf: CommonPhysicalWindowTableFunction,
+      projectKeepWindow: Boolean): Calc = {
+    relBuilder.push(tvf)
+    val (projects, outputType) = {
+      if (projectKeepWindow) {
+        (
+          List(relBuilder.field(1), relBuilder.field(6), relBuilder.field(5)),
+          typeFactory.buildRelNodeRowType(
+            Array("b", "my_window_end", "window_start"),
+            Array(
+              VarCharType.STRING_TYPE,
+              windowStartEndType,
+              windowStartEndType
+            )
+          ))
+      } else {
+        (
+          List(
+            relBuilder.field(1),
+            rexBuilder.makeCast(stringType, relBuilder.field(6)),
+            rexBuilder.makeCast(stringType, relBuilder.field(5))),
+          typeFactory.buildRelNodeRowType(
+            Array("b", "my_window_end", "window_start"),
+            Array(
+              VarCharType.STRING_TYPE,
+              VarCharType.STRING_TYPE,
+              VarCharType.STRING_TYPE
+            )
+          ))
+      }
+    }
+    val program = RexProgram.create(tvf.getRowType, projects, null, 
outputType, rexBuilder)
+    new StreamPhysicalCalc(
+      cluster,
+      streamPhysicalTraits,
+      tvf,
+      program,
+      program.getOutputRowType
+    )
+  }
+
+  // equivalent SQL is
+  // SELECT * FROM
+  //   TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE))
+  //  UNION ALL
+  // (SELECT * FROM
+  //   TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE)))
+  protected lazy val unionOnWindowTVFWithSameWindowSpec: Union =
+    createUnionOnWindowTVF(streamTumbleWindowTVFRel, streamTumbleWindowTVFRel)
+
+  // SELECT * FROM
+  //   TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE))
+  //  UNION ALL
+  // (SELECT * FROM
+  //   TABLE(HOP (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE, INTERVAL '1' HOUR)))
+  protected lazy val unionOnWindowTVFWithDifferentWindowSpec: Union =
+    createUnionOnWindowTVF(streamTumbleWindowTVFRel, streamHopWindowTVFRel)
+
+  protected def createUnionOnWindowTVF(
+      tvf1: CommonPhysicalWindowTableFunction,
+      tvf2: CommonPhysicalWindowTableFunction): Union = {
+    new StreamPhysicalUnion(cluster, streamPhysicalTraits, List(tvf1, tvf2), 
true, tvf1.getRowType)
+  }
+
+  // hash by field a
+  protected lazy val hashOnTumbleWindowTVF = 
createExchangeOnWindowTVF(streamTumbleWindowTVFRel)
+  protected lazy val hashOnHopWindowTVF = 
createExchangeOnWindowTVF(streamHopWindowTVFRel)
+  protected lazy val hashOnCumulateWindowTVF = 
createExchangeOnWindowTVF(streamCumulateWindowTVFRel)
+
+  protected def createExchangeOnWindowTVF(tvf: 
CommonPhysicalWindowTableFunction): Exchange = {
+    val hash = FlinkRelDistribution.hash(Array(0), requireStrict = true)
+    new StreamPhysicalExchange(
+      cluster,
+      streamPhysicalTraits.replace(hash),
+      tvf,
+      hash
+    )
+  }
+
+  // equivalent SQL is
+  // CREATE TEMPORARY VIEW tmp AS
+  // SELECT `id`, `name`, PROCTIME() AS ptime FROM student
+  // SELECT `id`, `window_start`, `window_end`, COUNT(DISTINCT `name`) AS cnt 
FROM
+  //   (TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(ptime), INTERVAL '10' MINUTE))) 
alias GROUP BY `id`, `window_start`, `window_end`
+  protected lazy val logicalGroupWindowAggOnTumbleWindowTVF = 
createLogicalAggregateOnWindowTVF(
+    true)
+
+  // equivalent SQL is
+  // CREATE TEMPORARY VIEW tmp AS
+  // SELECT `id`, `name`, PROCTIME() AS ptime FROM student
+  // SELECT `id`, COUNT(DISTINCT `name`) AS cnt FROM
+  //   (TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(ptime), INTERVAL '10' MINUTE))) 
alias GROUP BY `id`
+  protected lazy val logicalGroupAggOnTumbleWindowTVF = 
createLogicalAggregateOnWindowTVF(false)
+  protected def createLogicalAggregateOnWindowTVF(groupByWindow: Boolean): 
FlinkLogicalAggregate = {
+    relBuilder.push(windowTableFunctionScan)
+    val groupKey =
+      if (groupByWindow)
+        List(relBuilder.field(0), relBuilder.field(3), relBuilder.field(4))
+      else List(relBuilder.field(0))
+    val logicalAgg =
+      relBuilder
+        .aggregate(
+          relBuilder.groupKey(groupKey),
+          relBuilder.count(true, "cnt", relBuilder.field(1)))
+        .build()
+        .asInstanceOf[LogicalAggregate]
+    new FlinkLogicalAggregate(
+      cluster,
+      flinkLogicalTraits,
+      windowTableFunctionScan,
+      logicalAgg.getGroupSet,
+      logicalAgg.getGroupSets,
+      logicalAgg.getAggCallList
+    )
+  }
+
+  // equivalent SQL is
+  // SELECT window_start, window_end, SUM(a) AS sum_a, COUNT(b) AS cnt_b FROM
+  //   TABLE(TUMBLE (TABLE TemporalTable1, DESCRIPTOR(rowtime), INTERVAL '10' 
MINUTE))
+  // GROUP BY window_start, window_end
+  protected lazy val (
+    streamWindowAggOnWindowTVF,
+    streamLocalWindowAggOnWindowTVF,
+    streamGlobalWindowAggOnWindowTVF) = {
+    val logicalAgg =
+      relBuilder
+        .push(streamTumbleWindowTVFRel)
+        .aggregate(
+          relBuilder.groupKey(relBuilder.field(5), relBuilder.field(6)),
+          relBuilder.sum(false, "sum_a", relBuilder.field(0)),
+          relBuilder.count(false, "cnt_b", relBuilder.field(1))
+        )
+        .build()
+        .asInstanceOf[LogicalAggregate]
+
+    val windowRef = new WindowReference("w$", timeAttributeType)
+    val namedWindowProperties: Seq[NamedWindowProperty] = Seq(
+      new NamedWindowProperty("window_start", new WindowStart(windowRef)),
+      new NamedWindowProperty("window_end", new WindowEnd(windowRef)))
+    val traitSet = streamPhysicalTraits.replace(FlinkRelDistribution.SINGLETON)
+    val streamWindowAggOnWindowTVF = new StreamPhysicalWindowAggregate(
+      cluster,
+      traitSet,
+      streamTumbleWindowTVFRel,
+      Array(),
+      logicalAgg.getAggCallList,
+      new WindowAttachedWindowingStrategy(tumbleWindowSpec, timeAttributeType, 
5, 6),
+      namedWindowProperties)
+
+    val streamLocalWindowAggOnWindowTVF = new 
StreamPhysicalLocalWindowAggregate(
+      cluster,
+      traitSet,
+      streamTumbleWindowTVFRel,
+      streamWindowAggOnWindowTVF.grouping,
+      streamWindowAggOnWindowTVF.aggCalls,
+      streamWindowAggOnWindowTVF.windowing)
+
+    val exchange = new StreamPhysicalExchange(
+      cluster,
+      traitSet,
+      streamLocalWindowAggOnWindowTVF,
+      FlinkRelDistribution.SINGLETON)
+    val globalWindowing = new SliceAttachedWindowingStrategy(
+      tumbleWindowSpec,
+      timeAttributeType,
+      streamLocalWindowAggOnWindowTVF.getRowType.getFieldCount - 1)
+    val streamGlobalWindowAggOnWindowTVF = new 
StreamPhysicalGlobalWindowAggregate(
+      cluster,
+      traitSet,
+      exchange,
+      streamTumbleWindowTVFRel.getRowType,
+      Array(),
+      streamWindowAggOnWindowTVF.aggCalls,
+      globalWindowing,
+      namedWindowProperties)
+
+    (streamWindowAggOnWindowTVF, streamLocalWindowAggOnWindowTVF, 
streamGlobalWindowAggOnWindowTVF)
+  }
+
   // select * from TableSourceTable1
   // left join TableSourceTable2 on TableSourceTable1.b = TableSourceTable2.b
   protected lazy val logicalLeftJoinOnContainedUniqueKeys: RelNode = relBuilder
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowPropertiesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowPropertiesTest.scala
new file mode 100644
index 00000000000..3ef62d0ff43
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowPropertiesTest.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.metadata
+
+import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.WindowSpec
+import org.apache.flink.table.types.logical.LogicalType
+
+import org.apache.calcite.util.ImmutableBitSet
+import org.junit.Assert._
+import org.junit.Test
+
+/** Test for [[FlinkRelMdWindowProperties]]. */
+class FlinkRelMdWindowPropertiesTest extends FlinkRelMdHandlerTestBase {
+
+  @Test
+  def testGetWindowPropertiesOnTableScan(): Unit = {
+    // get window properties from statistics
+    Array(studentLogicalScan, studentFlinkLogicalScan, 
studentStreamScan).foreach(
+      scan => assertEquals(null, mq.getRelWindowProperties(scan)))
+
+    Array(temporalTableLogicalScan, temporalTableFlinkLogicalScan, 
temporalTableStreamScan).foreach(
+      scan =>
+        assertEquals(
+          createRelWindowProperties(0, 1, 2, tumbleWindowSpec, 
timeAttributeType),
+          mq.getRelWindowProperties(scan)
+        ))
+  }
+
+  @Test
+  def testGetWindowPropertiesOnTableFunctionScan(): Unit = {
+    Array(windowTableFunctionScan, 
lateralTableFunctionScan).zipWithIndex.foreach {
+      case (scan, idx) =>
+        assertEquals(
+          Array(createRelWindowProperties(3, 4, 5, tumbleWindowSpec, 
proctimeType), null).apply(
+            idx),
+          mq.getRelWindowProperties(scan)
+        )
+    }
+  }
+
+  @Test
+  def testGetWindowPropertiesOnWindowTableFunction(): Unit = {
+    Array(streamTumbleWindowTVFRel, streamHopWindowTVFRel, 
streamCumulateWindowTVFRel).zipWithIndex
+      .foreach {
+        case (tvf, idx) =>
+          assertEquals(
+            createRelWindowProperties(
+              5,
+              6,
+              7,
+              Array(tumbleWindowSpec, hopWindowSpec, 
cumulateWindowSpec).apply(idx),
+              timeAttributeType),
+            mq.getRelWindowProperties(tvf)
+          )
+      }
+  }
+
+  @Test
+  def testGetWindowPropertiesOnCalc(): Unit = {
+    Array(
+      keepWindowCalcOnTumbleWindowTVF,
+      keepWindowCalcOnHopWindowTVF,
+      keepWindowCalcOnCumulateWindowTVF).zipWithIndex
+      .foreach {
+        case (calc, idx) =>
+          assertEquals(
+            createRelWindowProperties(
+              2,
+              1,
+              -1,
+              Array(tumbleWindowSpec, hopWindowSpec, 
cumulateWindowSpec).apply(idx),
+              proctimeType),
+            mq.getRelWindowProperties(calc)
+          )
+      }
+    Array(
+      discardWindowCalcOnTumbleWindowTVF,
+      discardWindowCalcOnHopWindowTVF,
+      discardWindowCalcOnCumulateWindowTVF)
+      .foreach(
+        calc =>
+          assertEquals(
+            null,
+            mq.getRelWindowProperties(calc)
+          ))
+  }
+
+  @Test
+  def testGetWindowPropertiesOnUnion(): Unit = {
+    Array(unionOnWindowTVFWithSameWindowSpec, 
unionOnWindowTVFWithDifferentWindowSpec).zipWithIndex
+      .foreach {
+        case (union, idx) =>
+          assertEquals(
+            Array(createRelWindowProperties(5, 6, 7, tumbleWindowSpec, 
timeAttributeType), null)
+              .apply(idx),
+            mq.getRelWindowProperties(union)
+          )
+      }
+  }
+
+  @Test
+  def testGetWindowPropertiesOnExchange(): Unit = {
+    Array(hashOnTumbleWindowTVF, hashOnHopWindowTVF, 
hashOnCumulateWindowTVF).zipWithIndex.foreach {
+      case (exchange, idx) =>
+        assertEquals(
+          createRelWindowProperties(
+            5,
+            6,
+            7,
+            Array(
+              tumbleWindowSpec,
+              hopWindowSpec,
+              cumulateWindowSpec
+            ).apply(idx),
+            timeAttributeType),
+          mq.getRelWindowProperties(exchange)
+        )
+    }
+  }
+
+  @Test
+  def testGetWindowPropertiesOnLogicalAggregate(): Unit = {
+    Array(logicalGroupWindowAggOnTumbleWindowTVF, 
logicalGroupAggOnTumbleWindowTVF).zipWithIndex
+      .foreach {
+        case (groupAgg, idx) =>
+          assertEquals(
+            Array(
+              createRelWindowProperties(1, 2, -1, tumbleWindowSpec, 
proctimeType),
+              null
+            ).apply(idx),
+            mq.getRelWindowProperties(groupAgg)
+          )
+      }
+  }
+
+  @Test
+  def testGetWindowPropertiesOnPhysicalAggregate(): Unit = {
+    Array(
+      streamWindowAggOnWindowTVF,
+      streamLocalWindowAggOnWindowTVF,
+      streamGlobalWindowAggOnWindowTVF).zipWithIndex.foreach {
+      case (groupAgg, idx) =>
+        assertEquals(
+          Array(
+            createRelWindowProperties(2, 3, -1, tumbleWindowSpec, 
timeAttributeType),
+            createRelWindowProperties(5, 6, 7, tumbleWindowSpec, 
timeAttributeType),
+            createRelWindowProperties(2, 3, -1, tumbleWindowSpec, 
timeAttributeType)
+          ).apply(idx),
+          mq.getRelWindowProperties(groupAgg)
+        )
+    }
+  }
+
+  private def createRelWindowProperties(
+      start: Int,
+      end: Int,
+      time: Int,
+      spec: WindowSpec,
+      timeAttributeType: LogicalType): RelWindowProperties = {
+    RelWindowProperties.create(
+      ImmutableBitSet.of(start),
+      ImmutableBitSet.of(end),
+      if (time >= 0) ImmutableBitSet.of(time) else ImmutableBitSet.of(),
+      spec,
+      timeAttributeType)
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 671c4b854c9..af245270093 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -25,6 +25,8 @@ import 
org.apache.flink.table.connector.source.{DynamicTableSource, ScanTableSou
 import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, 
FlinkTypeFactory}
+import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
+import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec
 import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
TableSourceTable}
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -37,7 +39,9 @@ import org.apache.calcite.rel.`type`.{RelDataType, 
RelDataTypeFactory}
 import org.apache.calcite.schema.{Schema, SchemaPlus, Table}
 import org.apache.calcite.schema.Schema.TableType
 import org.apache.calcite.sql.{SqlCall, SqlNode}
+import org.apache.calcite.util.ImmutableBitSet
 
+import java.time.Duration
 import java.util
 import java.util.Collections
 
@@ -57,6 +61,7 @@ object MetadataTestUtil {
     rootSchema.add("TemporalTable1", createTemporalTable1())
     rootSchema.add("TemporalTable2", createTemporalTable2())
     rootSchema.add("TemporalTable3", createTemporalTable3())
+    rootSchema.add("TemporalTable4", createTemporalTable4())
     rootSchema.add("TableSourceTable1", createTableSourceTable1())
     rootSchema.add("TableSourceTable2", createTableSourceTable2())
     rootSchema.add("TableSourceTable3", createTableSourceTable3())
@@ -265,6 +270,37 @@ object MetadataTestUtil {
     getMetadataTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats))
   }
 
+  private def createTemporalTable4(): Table = {
+    val fieldNames = Array("window_start", "window_end", "window_time", "a", 
"b", "c")
+    val fieldTypes = Array[LogicalType](
+      new TimestampType(false, 3),
+      new TimestampType(false, 3),
+      new TimestampType(true, TimestampKind.ROWTIME, 3),
+      new IntType(),
+      new BigIntType(),
+      VarCharType.STRING_TYPE
+    )
+
+    val windowProperties = RelWindowProperties.create(
+      ImmutableBitSet.of(0),
+      ImmutableBitSet.of(1),
+      ImmutableBitSet.of(2),
+      new TumblingWindowSpec(Duration.ofMinutes(10L), null),
+      fieldTypes.apply(2))
+
+    val colStatsMap = Map[String, ColumnStats](
+      "a" -> new ColumnStats(3740000000L, 0L, 4d, 4, null, null),
+      "b" -> new ColumnStats(53252726L, 1474L, 8d, 8, 100000000L, -100000000L),
+      "c" -> new ColumnStats(null, 0L, 18.6, 64, null, null)
+    )
+
+    val tableStats = new TableStats(4000000000L, colStatsMap)
+    getMetadataTable(
+      fieldNames,
+      fieldTypes,
+      new FlinkStatistic(tableStats, relWindowProperties = windowProperties))
+  }
+
   private val flinkContext = new FlinkContextImpl(
     false,
     TableConfig.getDefault,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
index 9788791e781..9fe0b3e7fbf 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
@@ -1273,4 +1273,64 @@ class WindowJoinTest extends TableTestBase {
       """.stripMargin
     util.verifyRelPlan(sql)
   }
+
+  @Test
+  def testJoinToMultiSink(): Unit = {
+    val sourceDdl =
+      """
+        |CREATE TABLE food_order (
+        | user_id STRING,
+        | order_id STRING,
+        | amount INT,
+        | event_time TIMESTAMP(3),
+        | WATERMARK FOR event_time AS event_time
+        |) WITH (
+        |'connector' = 'values')
+        |""".stripMargin
+    util.tableEnv.executeSql(sourceDdl)
+
+    val query =
+      """
+        |CREATE TEMPORARY VIEW food_view AS
+        |WITH food AS ( 
+        |  SELECT user_id, 
+        |         window_start, 
+        |         window_end 
+        |  FROM TABLE(TUMBLE(TABLE food_order, DESCRIPTOR(event_time), 
INTERVAL '1' MINUTES)) 
+        |  GROUP BY 
+        |  user_id,
+        |  window_start,
+        |  window_end)
+        |SELECT food.window_start
+        |     ,food.window_end
+        |     ,food.user_id
+        |     ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'yyyyMMdd') AS 
dt
+        |     ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'HH') AS `hour`
+        |FROM food
+        |LEFT JOIN food AS a ON food.user_id = a.user_id
+        |AND food.window_start = a.window_start
+        |AND food.window_end = a.window_end
+        |""".stripMargin
+
+    util.tableEnv.executeSql(query)
+
+    val sinkDdl =
+      """
+        |CREATE TABLE %s (
+        | window_start TIMESTAMP(3),
+        | window_end TIMESTAMP(3),
+        | user_id STRING,
+        | dt STRING,
+        | `hour` STRING
+        |) WITH (
+        | 'connector' = 'values')
+        |""".stripMargin
+    util.tableEnv.executeSql(sinkDdl.format("sink1"))
+    util.tableEnv.executeSql(sinkDdl.format("sink2"))
+
+    val statementSet = util.tableEnv.createStatementSet()
+    statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM food_view")
+    statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM food_view")
+    util.verifyRelPlan(statementSet)
+  }
 }


Reply via email to