Matt Cuento created FLINK-38866:
-----------------------------------
Summary: TIMESTAMP_LTZ(6) values with UNION ALL into window TVF
produce incorrect results in batch mode
Key: FLINK-38866
URL: https://issues.apache.org/jira/browse/FLINK-38866
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 2.1.1, 2.0.1, 1.19.3
Reporter: Matt Cuento
Window operators hardcode a precision of 3 by default when extracting the
timestamp of the defined descriptor. In streaming mode, the descriptor must be
a watermark attribute. Watermark attributes have validation to ensure precision
does not exceed 3.
In batch mode, we don't have watermarks, and we lack validation on the
precision of the descriptor for a small subset of windowed queries. As a
result, when using timestamp columns with high precision, incorrect results are
produced.
For example, this query produces incorrect results:
{code:java}
SELECT
*
FROM
(
WITH values_table AS (
SELECT
CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
UNION ALL
SELECT
CAST('2024-01-01 10:05:00' AS TIMESTAMP_LTZ) AS event_time
UNION ALL
SELECT
CAST('2024-01-01 10:10:00' AS TIMESTAMP_LTZ) AS event_time
)
SELECT
window_start,
window_end
FROM
TABLE(
HOP(
TABLE values_table,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTES,
INTERVAL '2' MINUTES
)
)
GROUP BY
window_start,
window_end
); {code}
Expected:
{code:java}
+I[2024-01-01T09:59, 2024-01-01T10:01]
+I[2024-01-01T10:00, 2024-01-01T10:02]
+I[2024-01-01T10:04, 2024-01-01T10:06]
+I[2024-01-01T10:05, 2024-01-01T10:07]
+I[2024-01-01T10:09, 2024-01-01T10:11]
+I[2024-01-01T10:10, 2024-01-01T10:12] {code}
Actual:
{code:java}
+I[1972-03-06T00:43, 1972-03-06T00:45],
+I[1972-03-06T00:44, 1972-03-06T00:46] {code}
Explain:
{code:java}
== Abstract Syntax Tree ==
LogicalProject(window_start=[$0], window_end=[$1])
+- LogicalAggregate(group=[{0, 1}])
+- LogicalProject(window_start=[$1], window_end=[$2])
+- LogicalTableFunctionScan(invocation=[HOP(TABLE(#0),
DESCRIPTOR(_UTF-16LE'event_time'), 60000:INTERVAL MINUTE, 120000:INTERVAL
MINUTE)], rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) event_time,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end,
TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) window_time)])
+- LogicalProject(event_time=[$0])
+- LogicalUnion(all=[true])
:- LogicalUnion(all=[true])
: :- LogicalProject(event_time=[2024-01-01
18:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
: : +- LogicalValues(tuples=[[{ 0 }]])
: +- LogicalProject(event_time=[2024-01-01
18:05:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
: +- LogicalValues(tuples=[[{ 0 }]])
+- LogicalProject(event_time=[2024-01-01
18:10:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
+- LogicalValues(tuples=[[{ 0 }]])
== Optimized Physical Plan ==
SortAggregate(isMerge=[true], groupBy=[window_start, window_end],
select=[window_start, window_end])
+- Sort(orderBy=[window_start ASC, window_end ASC])
+- Exchange(distribution=[hash[window_start, window_end]])
+- LocalSortAggregate(groupBy=[window_start, window_end],
select=[window_start, window_end])
+- Sort(orderBy=[window_start ASC, window_end ASC])
+- Calc(select=[window_start, window_end])
+- WindowTableFunction(window=[HOP(time_col=[event_time],
size=[2 min], slide=[1 min])])
+- Union(all=[true], union=[event_time])
:- Union(all=[true], union=[event_time])
: :- Calc(select=[2024-01-01 18:00:00 AS event_time])
: : +- Values(tuples=[[{ 0 }]], values=[ZERO])
: +- Calc(select=[2024-01-01 18:05:00 AS event_time])
: +- Values(tuples=[[{ 0 }]], values=[ZERO])
+- Calc(select=[2024-01-01 18:10:00 AS event_time])
+- Values(tuples=[[{ 0 }]], values=[ZERO])
== Optimized Execution Plan ==
SortAggregate(isMerge=[true], groupBy=[window_start, window_end],
select=[window_start, window_end])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[window_start ASC, window_end ASC])
+- Exchange(distribution=[hash[window_start, window_end]])
+- LocalSortAggregate(groupBy=[window_start, window_end],
select=[window_start, window_end])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[window_start ASC, window_end ASC])
+- Calc(select=[window_start, window_end])
+- WindowTableFunction(window=[HOP(time_col=[event_time],
size=[2 min], slide=[1 min])])
+- Union(all=[true], union=[event_time])
:- Union(all=[true], union=[event_time])
: :- Calc(select=[2024-01-01 18:00:00 AS
event_time])
: : +- Values(tuples=[[{ 0 }]],
values=[ZERO])(reuse_id=[1])
: +- Calc(select=[2024-01-01 18:05:00 AS
event_time])
: +- Reused(reference_id=[1])
+- Calc(select=[2024-01-01 18:10:00 AS event_time])
+- Reused(reference_id=[1])
{code}
What's interesting is that it doesn't produce incorrect results without unions
(using a single row):
{code:java}
WITH values_table AS (
SELECT CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
) {code}
Similarly, this doesn't produce incorrect results as a select from a catalog
table.
As part of this bug, I'd like to add validation that precision is <= 3 in the
planner when creating a window strategy. Separately, I'd appreciate help in
root causing what might be wrong in the union operator for this particular
query given that the non-unioned query produces proper windows.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)