dawidwys commented on a change in pull request #10763: [FLINK-14200][table] Fix NPE for Temporal Table Function Join when left side is a query instead of a source URL: https://github.com/apache/flink/pull/10763#discussion_r363215985
########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala ########## @@ -158,11 +160,87 @@ class TemporalJoinITCase(state: StateBackendMode) assertEquals(expectedOutput, sink.getAppendResults.toSet) } + + @Test + def testNestedTemporalJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + val sqlQuery = + """ + |SELECT + | o.amount, r.rate, p.price + |FROM + | Orders AS o, + | LATERAL TABLE (Rates(o.rowtime)) AS r, + | LATERAL TABLE (Prices(o.rowtime)) AS p + |WHERE r.currency = o.currency AND p.productId = o.productId + |""".stripMargin + + val ordersData = new mutable.MutableList[(Long, String, String, Timestamp)] + ordersData.+=((2L, "A1", "Euro", new Timestamp(2L))) + ordersData.+=((1L, "A2", "US Dollar", new Timestamp(3L))) + ordersData.+=((50L, "A4", "Yen", new Timestamp(4L))) + ordersData.+=((3L, "A2", "Euro", new Timestamp(5L))) + + val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)] + ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L))) + ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L))) + ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L))) + ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L))) + ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L))) + + val pricesHistoryData = new mutable.MutableList[(String, Double, Timestamp)] + pricesHistoryData.+=(("A2", 10.2D, new Timestamp(1L))) + pricesHistoryData.+=(("A1", 11.4D, new Timestamp(1L))) + pricesHistoryData.+=(("A4", 1D, new Timestamp(1L))) + pricesHistoryData.+=(("A1", 11.6D, new Timestamp(5L))) + pricesHistoryData.+=(("A1", 11.9D, new Timestamp(7L))) + + val orders = env + .fromCollection(ordersData) + .asInstanceOf[DataStream[Product]] + .assignTimestampsAndWatermarks(new TimestampExtractor()) + .toTable(tEnv, 'amount, 'productId, 'currency, 'rowtime.rowtime) + val ratesHistory = env + .fromCollection(ratesHistoryData) + .asInstanceOf[DataStream[Product]] + .assignTimestampsAndWatermarks(new TimestampExtractor()) + .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime) + val pricesHistory = env + .fromCollection(pricesHistoryData) + .asInstanceOf[DataStream[Product]] + .assignTimestampsAndWatermarks(new TimestampExtractor()) + .toTable(tEnv, 'productId, 'price, 'rowtime.rowtime) + + tEnv.createTemporaryView("Orders", orders) + tEnv.createTemporaryView("RatesHistory", ratesHistory) + tEnv.registerFunction( + "Rates", + ratesHistory.createTemporalTableFunction("rowtime", "currency")) + tEnv.registerFunction( + "Prices", + pricesHistory.createTemporalTableFunction("rowtime", "productId")) + + tEnv.createTemporaryView("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) + + // Scan from registered table to test for interplay between + // LogicalCorrelateToTemporalTableJoinRule and TableScanRule + val result = tEnv.from("TemporalJoinResult").toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("1,102,10.2", "3,116,10.2", "2,114,11.4", "50,1,1.0") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } } -class TimestampExtractor[T1, T2] - extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { - override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { - element._3.getTime +class TimestampExtractor + extends BoundedOutOfOrdernessTimestampExtractor[Product](Time.seconds(10)) { + override def extractTimestamp(element: Product): Long = element match { + case (_, _, ts: Timestamp) => ts.getTime + case (_, _, _, ts: Timestamp) => ts.getTime Review comment: add a default branch: ``` case _ => throw new IllegalArgumentException("Expected the last element in a tuple to be of a Timestamp type.") ``` This will make it easier to use this extractor in future tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services