dawidwys commented on a change in pull request #10763: [FLINK-14200][table] Fix 
NPE for Temporal Table Function Join when left side is a query instead of a 
source
URL: https://github.com/apache/flink/pull/10763#discussion_r363215985
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
 ##########
 @@ -158,11 +160,87 @@ class TemporalJoinITCase(state: StateBackendMode)
 
     assertEquals(expectedOutput, sink.getAppendResults.toSet)
   }
+
+  @Test
+  def testNestedTemporalJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val sqlQuery =
+      """
+        |SELECT
+        |  o.amount, r.rate, p.price
+        |FROM
+        |  Orders AS o,
+        |  LATERAL TABLE (Rates(o.rowtime)) AS r,
+        |  LATERAL TABLE (Prices(o.rowtime)) AS p
+        |WHERE r.currency = o.currency AND p.productId = o.productId
+        |""".stripMargin
+
+    val ordersData = new mutable.MutableList[(Long, String, String, Timestamp)]
+    ordersData.+=((2L, "A1", "Euro", new Timestamp(2L)))
+    ordersData.+=((1L, "A2", "US Dollar", new Timestamp(3L)))
+    ordersData.+=((50L, "A4", "Yen", new Timestamp(4L)))
+    ordersData.+=((3L, "A2", "Euro", new Timestamp(5L)))
+
+    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
+    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
+    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
+    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
+
+    val pricesHistoryData = new mutable.MutableList[(String, Double, 
Timestamp)]
+    pricesHistoryData.+=(("A2", 10.2D, new Timestamp(1L)))
+    pricesHistoryData.+=(("A1", 11.4D, new Timestamp(1L)))
+    pricesHistoryData.+=(("A4", 1D, new Timestamp(1L)))
+    pricesHistoryData.+=(("A1", 11.6D, new Timestamp(5L)))
+    pricesHistoryData.+=(("A1", 11.9D, new Timestamp(7L)))
+
+    val orders = env
+      .fromCollection(ordersData)
+      .asInstanceOf[DataStream[Product]]
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .toTable(tEnv, 'amount, 'productId, 'currency, 'rowtime.rowtime)
+    val ratesHistory = env
+      .fromCollection(ratesHistoryData)
+      .asInstanceOf[DataStream[Product]]
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
+    val pricesHistory = env
+      .fromCollection(pricesHistoryData)
+      .asInstanceOf[DataStream[Product]]
+      .assignTimestampsAndWatermarks(new TimestampExtractor())
+      .toTable(tEnv, 'productId, 'price, 'rowtime.rowtime)
+
+    tEnv.createTemporaryView("Orders", orders)
+    tEnv.createTemporaryView("RatesHistory", ratesHistory)
+    tEnv.registerFunction(
+      "Rates",
+      ratesHistory.createTemporalTableFunction("rowtime", "currency"))
+    tEnv.registerFunction(
+      "Prices",
+      pricesHistory.createTemporalTableFunction("rowtime", "productId"))
+
+    tEnv.createTemporaryView("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
+
+    // Scan from registered table to test for interplay between
+    // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
+    val result = tEnv.from("TemporalJoinResult").toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List("1,102,10.2", "3,116,10.2", "2,114,11.4", "50,1,1.0")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
 
-class TimestampExtractor[T1, T2]
-  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, 
Timestamp)](Time.seconds(10))  {
-  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
-    element._3.getTime
+class TimestampExtractor
+  extends BoundedOutOfOrdernessTimestampExtractor[Product](Time.seconds(10))  {
+  override def extractTimestamp(element: Product): Long = element match {
+    case (_, _, ts: Timestamp) => ts.getTime
+    case (_, _, _, ts: Timestamp) => ts.getTime
 
 Review comment:
   add a default branch:
   ```
   case _ => throw new IllegalArgumentException("Expected the last element in a 
tuple to be of a Timestamp type.")
   ```
   
   This will make it easier to use this extractor in future tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to