Dawid Wysakowicz created FLINK-34910:
----------------------------------------
Summary: Can not plan window join without projections
Key: FLINK-34910
URL: https://issues.apache.org/jira/browse/FLINK-34910
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
Fix For: 1.20.0
When running:
{code}
@Test
def testWindowJoinWithoutProjections(): Unit = {
val sql =
"""
|SELECT *
|FROM
| TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE)) AS L
|JOIN
| TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE)) AS R
|ON L.window_start = R.window_start AND L.window_end = R.window_end AND
L.a = R.a
""".stripMargin
util.verifyRelPlan(sql)
}
{code}
It fails with:
{code}
FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME_MATERIALIZE(proctime) AS
proctime, window_start, window_end, window_time, a0, b0, c0, rowtime0,
PROCTIME_MATERIALIZE(proctime0) AS proctime0, window_start0, window_end0,
window_time0])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{}])
:- FlinkLogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($3),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
: +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
: +- FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, rowtime])
+-
FlinkLogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR(CAST($3):TIMESTAMP(3)),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647)
b, BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME*
proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3)
*ROWTIME* window_time)])
+- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
+- FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, MyTable2]], fields=[a, b, c, rowtime])
Failed to get time attribute index from DESCRIPTOR(CAST($3):TIMESTAMP(3)). This
is a bug, please file a JIRA issue.
Please check the documentation for the set of currently supported SQL features.
{code}
In prior versions this had another problem of ambiguous {{rowtime}} column, but
this has been fixed by [FLINK-32648]. In versions < 1.19 WindowTableFunctions
were incorrectly scoped, because they were not extending from Calcite's
SqlWindowTableFunction and the scoping implemented in
SqlValidatorImpl#convertFrom was incorrect.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)