[
https://issues.apache.org/jira/browse/FLINK-38866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18049983#comment-18049983
]
Matt Cuento commented on FLINK-38866:
-------------------------------------
It appears that the reason this only occurs with UNION ALL is because we are
then forced to use `BinaryRowData` instead of `GenericRowData`. `BinaryRowData`
has a separate non-compact representation of timestamps with precision > 3.
So, the true impact of this bug is whenever `BinaryRowData` is passed to a
window function with precision > 3 in batch mode.
As opposed to strictly validating against this, we can pass the timestamp
precision into window operator implementations so `getTimestamp()` can be
called with the concrete precision.
> TIMESTAMP_LTZ(6) values with UNION ALL into window TVF produce incorrect
> results in batch mode
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-38866
> URL: https://issues.apache.org/jira/browse/FLINK-38866
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.19.3, 2.0.1, 2.1.1
> Reporter: Matt Cuento
> Priority: Minor
> Labels: pull-request-available
>
> Window operators hardcode a precision of 3 by default when extracting the
> timestamp of the defined descriptor. In streaming mode, the descriptor must
> be a watermark attribute. Watermark attributes have validation to ensure
> precision does not exceed 3.
> In batch mode, we don't have watermarks, and we lack validation on the
> precision of the descriptor for a small subset of windowed queries. As a
> result, when using timestamp columns with high precision, incorrect results
> are produced.
> For example, this query produces incorrect results:
>
> {code:java}
> SELECT
> *
> FROM
> (
> WITH values_table AS (
> SELECT
> CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
> UNION ALL
> SELECT
> CAST('2024-01-01 10:05:00' AS TIMESTAMP_LTZ) AS event_time
> UNION ALL
> SELECT
> CAST('2024-01-01 10:10:00' AS TIMESTAMP_LTZ) AS event_time
> )
> SELECT
> window_start,
> window_end
> FROM
> TABLE(
> HOP(
> TABLE values_table,
> DESCRIPTOR(event_time),
> INTERVAL '1' MINUTES,
> INTERVAL '2' MINUTES
> )
> )
> GROUP BY
> window_start,
> window_end
> ); {code}
>
> Expected:
> {code:java}
> +I[2024-01-01T09:59, 2024-01-01T10:01]
> +I[2024-01-01T10:00, 2024-01-01T10:02]
> +I[2024-01-01T10:04, 2024-01-01T10:06]
> +I[2024-01-01T10:05, 2024-01-01T10:07]
> +I[2024-01-01T10:09, 2024-01-01T10:11]
> +I[2024-01-01T10:10, 2024-01-01T10:12] {code}
> Actual:
> {code:java}
> +I[1972-03-06T00:43, 1972-03-06T00:45],
> +I[1972-03-06T00:44, 1972-03-06T00:46] {code}
>
> Explain:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(window_start=[$0], window_end=[$1])
> +- LogicalAggregate(group=[{0, 1}])
> +- LogicalProject(window_start=[$1], window_end=[$2])
> +- LogicalTableFunctionScan(invocation=[HOP(TABLE(#0),
> DESCRIPTOR(_UTF-16LE'event_time'), 60000:INTERVAL MINUTE, 120000:INTERVAL
> MINUTE)], rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) event_time,
> TIMESTAMP(3) window_start, TIMESTAMP(3) window_end,
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) window_time)])
> +- LogicalProject(event_time=[$0])
> +- LogicalUnion(all=[true])
> :- LogicalUnion(all=[true])
> : :- LogicalProject(event_time=[2024-01-01
> 18:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
> : : +- LogicalValues(tuples=[[{ 0 }]])
> : +- LogicalProject(event_time=[2024-01-01
> 18:05:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
> : +- LogicalValues(tuples=[[{ 0 }]])
> +- LogicalProject(event_time=[2024-01-01
> 18:10:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
> +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> SortAggregate(isMerge=[true], groupBy=[window_start, window_end],
> select=[window_start, window_end])
> +- Sort(orderBy=[window_start ASC, window_end ASC])
> +- Exchange(distribution=[hash[window_start, window_end]])
> +- LocalSortAggregate(groupBy=[window_start, window_end],
> select=[window_start, window_end])
> +- Sort(orderBy=[window_start ASC, window_end ASC])
> +- Calc(select=[window_start, window_end])
> +- WindowTableFunction(window=[HOP(time_col=[event_time],
> size=[2 min], slide=[1 min])])
> +- Union(all=[true], union=[event_time])
> :- Union(all=[true], union=[event_time])
> : :- Calc(select=[2024-01-01 18:00:00 AS event_time])
> : : +- Values(tuples=[[{ 0 }]], values=[ZERO])
> : +- Calc(select=[2024-01-01 18:05:00 AS event_time])
> : +- Values(tuples=[[{ 0 }]], values=[ZERO])
> +- Calc(select=[2024-01-01 18:10:00 AS event_time])
> +- Values(tuples=[[{ 0 }]], values=[ZERO])
> == Optimized Execution Plan ==
> SortAggregate(isMerge=[true], groupBy=[window_start, window_end],
> select=[window_start, window_end])
> +- Exchange(distribution=[forward])
> +- Sort(orderBy=[window_start ASC, window_end ASC])
> +- Exchange(distribution=[hash[window_start, window_end]])
> +- LocalSortAggregate(groupBy=[window_start, window_end],
> select=[window_start, window_end])
> +- Exchange(distribution=[forward])
> +- Sort(orderBy=[window_start ASC, window_end ASC])
> +- Calc(select=[window_start, window_end])
> +-
> WindowTableFunction(window=[HOP(time_col=[event_time], size=[2 min], slide=[1
> min])])
> +- Union(all=[true], union=[event_time])
> :- Union(all=[true], union=[event_time])
> : :- Calc(select=[2024-01-01 18:00:00 AS
> event_time])
> : : +- Values(tuples=[[{ 0 }]],
> values=[ZERO])(reuse_id=[1])
> : +- Calc(select=[2024-01-01 18:05:00 AS
> event_time])
> : +- Reused(reference_id=[1])
> +- Calc(select=[2024-01-01 18:10:00 AS event_time])
> +- Reused(reference_id=[1])
> {code}
>
> What's interesting is that it doesn't produce incorrect results without
> unions (using a single row):
> {code:java}
> WITH values_table AS (
> SELECT CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
> ) {code}
> Similarly, this doesn't produce incorrect results as a select from a catalog
> table.
> As part of this bug, I'd like to add validation that precision is <= 3 in the
> planner when creating a window strategy. Separately, I'd appreciate help in
> root causing what might be wrong in the union operator for this particular
> query given that the non-unioned query produces proper windows.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)