Hequn Cheng created FLINK-11916:
-----------------------------------
Summary: Join with a Temporal Table should throw exception for
left join
Key: FLINK-11916
URL: https://issues.apache.org/jira/browse/FLINK-11916
Project: Flink
Issue Type: Bug
Components: API / Table SQL
Reporter: Hequn Cheng
InĀ {{TemporalJoinITCase.testProcessTimeInnerJoin}}, if we change the inner join
to left join the test works fine. We may need to throw an exception if we only
support inner join.
CC [~pnowojski]
The problem can be reproduced with the following sql:
{code:java}
@Test
def testEventTimeInnerJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
env.setStateBackend(getStateBackend)
StreamITCase.clear
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val sqlQuery =
"""
|SELECT
| o.amount * r.rate AS amount
|FROM
| Orders AS o left join
| LATERAL TABLE (Rates(o.rowtime)) AS r on true
|WHERE r.currency = o.currency
|""".stripMargin
val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
ordersData.+=((2L, "Euro", new Timestamp(2L)))
ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
ordersData.+=((50L, "Yen", new Timestamp(4L)))
ordersData.+=((3L, "Euro", new Timestamp(5L)))
val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
var expectedOutput = new mutable.HashSet[String]()
expectedOutput += (2 * 114).toString
expectedOutput += (3 * 116).toString
val orders = env
.fromCollection(ordersData)
.assignTimestampsAndWatermarks(new TimestampExtractor[Long, String]())
.toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
val ratesHistory = env
.fromCollection(ratesHistoryData)
.assignTimestampsAndWatermarks(new TimestampExtractor[String, Long]())
.toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
tEnv.registerTable("Orders", orders)
tEnv.registerTable("RatesHistory", ratesHistory)
tEnv.registerTable("FilteredRatesHistory",
tEnv.scan("RatesHistory").filter('rate > 110L))
tEnv.registerFunction(
"Rates",
tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime,
'currency))
tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
// Scan from registered table to test for interplay between
// LogicalCorrelateToTemporalTableJoinRule and TableScanRule
val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
assertEquals(expectedOutput, StreamITCase.testResults.toSet)
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)