snuyanzin commented on code in PR #27616:
URL: https://github.com/apache/flink/pull/27616#discussion_r2839655142


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.scala:
##########
@@ -510,4 +512,142 @@ class OverAggregateTest extends TableTestBase {
 
     util.verifyExecPlan(sql)
   }
+
+  @Test
+  def testTemporalJoinWithWatermarks(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE orders (
+                     |  product_id STRING,
+                     |  amount BIGINT,
+                     |  order_ts TIMESTAMP(3),
+                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.addTable(s"""
+                     |CREATE TABLE products (
+                     |  product_id STRING,
+                     |  record_ts STRING,
+                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
+                     |  PRIMARY KEY (product_id) NOT ENFORCED,
+                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
INTERVAL '60' SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.verifyExecPlan(s"""
+                           |SELECT count(o.amount) OVER (PARTITION BY 
o.product_id) AS amount_count
+                           |FROM orders AS o
+                           |LEFT JOIN products FOR SYSTEM_TIME AS OF 
o.order_ts AS p
+                           |ON o.product_id = p.product_id
+                           |""".stripMargin)
+  }
+
+  @Test
+  def testTemporalJoinWithWatermarksSeveralFunctions(): Unit = {
+    util.addTable(s"""
+                     |CREATE TABLE orders (
+                     |  product_id STRING,
+                     |  amount BIGINT,
+                     |  order_ts TIMESTAMP(3),
+                     |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.addTable(s"""
+                     |CREATE TABLE products (
+                     |  product_id STRING,
+                     |  record_ts STRING,
+                     |  mod_record_ts AS TO_TIMESTAMP(record_ts),
+                     |  PRIMARY KEY (product_id) NOT ENFORCED,
+                     |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
INTERVAL '60' SECONDS
+                     |) WITH (
+                     |  'connector' = 'values'
+                     |)
+                     |""".stripMargin)
+
+    util.verifyExecPlan(
+      s"""
+         |SELECT last_value(o.amount) OVER (PARTITION BY o.product_id ORDER BY 
o.order_ts) AS last_amount,
+         |       lag(o.amount) OVER (PARTITION BY o.product_id ORDER BY 
o.order_ts) AS prev_amount
+         |FROM orders AS o
+         |LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_ts AS p
+         |ON o.product_id = p.product_id
+         |""".stripMargin)
+  }
+
+  @Test
+  def testTemporalJoinWithWatermarksWithMaterializedTimeArg(): Unit = {

Review Comment:
   Last 2 tests are. not regression, 
   Seems they just didn't work for long time even for 1.20



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to