[ 
https://issues.apache.org/jira/browse/FLINK-38866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18049983#comment-18049983
 ] 

Matt Cuento commented on FLINK-38866:
-------------------------------------

It appears that the reason this only occurs with UNION ALL is because we are 
then forced to use `BinaryRowData` instead of `GenericRowData`. `BinaryRowData` 
has a separate non-compact representation of timestamps with precision > 3.

So, the true impact of this bug is whenever `BinaryRowData` is passed to a 
window function with precision > 3 in batch mode.

As opposed to strictly validating against this, we can pass the timestamp 
precision into window operator implementations so `getTimestamp()` can be 
called with the concrete precision.

> TIMESTAMP_LTZ(6) values with UNION ALL into window TVF produce incorrect 
> results in batch mode
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38866
>                 URL: https://issues.apache.org/jira/browse/FLINK-38866
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.19.3, 2.0.1, 2.1.1
>            Reporter: Matt Cuento
>            Priority: Minor
>              Labels: pull-request-available
>
> Window operators hardcode a precision of 3 by default when extracting the 
> timestamp of the defined descriptor. In streaming mode, the descriptor must 
> be a watermark attribute. Watermark attributes have validation to ensure 
> precision does not exceed 3.
> In batch mode, we don't have watermarks, and we lack validation on the 
> precision of the descriptor for a small subset of windowed queries. As a 
> result, when using timestamp columns with high precision, incorrect results 
> are produced.
> For example, this query produces incorrect results:
>  
> {code:java}
> SELECT
>   *
> FROM
>   (
>     WITH values_table AS (
>       SELECT
>         CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
>       UNION ALL
>       SELECT
>         CAST('2024-01-01 10:05:00' AS TIMESTAMP_LTZ) AS event_time
>       UNION ALL
>       SELECT
>         CAST('2024-01-01 10:10:00' AS TIMESTAMP_LTZ) AS event_time
>     )
>     SELECT
>       window_start,
>       window_end
>     FROM
>       TABLE(
>         HOP(
>           TABLE values_table,
>           DESCRIPTOR(event_time),
>           INTERVAL '1' MINUTES,
>           INTERVAL '2' MINUTES
>         )
>       )
>     GROUP BY
>       window_start,
>       window_end
>   ); {code}
>  
> Expected:
> {code:java}
> +I[2024-01-01T09:59, 2024-01-01T10:01]
> +I[2024-01-01T10:00, 2024-01-01T10:02]
> +I[2024-01-01T10:04, 2024-01-01T10:06]
> +I[2024-01-01T10:05, 2024-01-01T10:07]
> +I[2024-01-01T10:09, 2024-01-01T10:11]
> +I[2024-01-01T10:10, 2024-01-01T10:12] {code}
> Actual:
> {code:java}
> +I[1972-03-06T00:43, 1972-03-06T00:45], 
> +I[1972-03-06T00:44, 1972-03-06T00:46] {code}
>  
> Explain:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(window_start=[$0], window_end=[$1])
> +- LogicalAggregate(group=[{0, 1}])
>    +- LogicalProject(window_start=[$1], window_end=[$2])
>       +- LogicalTableFunctionScan(invocation=[HOP(TABLE(#0), 
> DESCRIPTOR(_UTF-16LE'event_time'), 60000:INTERVAL MINUTE, 120000:INTERVAL 
> MINUTE)], rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) event_time, 
> TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) window_time)])
>          +- LogicalProject(event_time=[$0])
>             +- LogicalUnion(all=[true])
>                :- LogicalUnion(all=[true])
>                :  :- LogicalProject(event_time=[2024-01-01 
> 18:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
>                :  :  +- LogicalValues(tuples=[[{ 0 }]])
>                :  +- LogicalProject(event_time=[2024-01-01 
> 18:05:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
>                :     +- LogicalValues(tuples=[[{ 0 }]])
>                +- LogicalProject(event_time=[2024-01-01 
> 18:10:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
>                   +- LogicalValues(tuples=[[{ 0 }]])
> == Optimized Physical Plan ==
> SortAggregate(isMerge=[true], groupBy=[window_start, window_end], 
> select=[window_start, window_end])
> +- Sort(orderBy=[window_start ASC, window_end ASC])
>    +- Exchange(distribution=[hash[window_start, window_end]])
>       +- LocalSortAggregate(groupBy=[window_start, window_end], 
> select=[window_start, window_end])
>          +- Sort(orderBy=[window_start ASC, window_end ASC])
>             +- Calc(select=[window_start, window_end])
>                +- WindowTableFunction(window=[HOP(time_col=[event_time], 
> size=[2 min], slide=[1 min])])
>                   +- Union(all=[true], union=[event_time])
>                      :- Union(all=[true], union=[event_time])
>                      :  :- Calc(select=[2024-01-01 18:00:00 AS event_time])
>                      :  :  +- Values(tuples=[[{ 0 }]], values=[ZERO])
>                      :  +- Calc(select=[2024-01-01 18:05:00 AS event_time])
>                      :     +- Values(tuples=[[{ 0 }]], values=[ZERO])
>                      +- Calc(select=[2024-01-01 18:10:00 AS event_time])
>                         +- Values(tuples=[[{ 0 }]], values=[ZERO])
> == Optimized Execution Plan ==
> SortAggregate(isMerge=[true], groupBy=[window_start, window_end], 
> select=[window_start, window_end])
> +- Exchange(distribution=[forward])
>    +- Sort(orderBy=[window_start ASC, window_end ASC])
>       +- Exchange(distribution=[hash[window_start, window_end]])
>          +- LocalSortAggregate(groupBy=[window_start, window_end], 
> select=[window_start, window_end])
>             +- Exchange(distribution=[forward])
>                +- Sort(orderBy=[window_start ASC, window_end ASC])
>                   +- Calc(select=[window_start, window_end])
>                      +- 
> WindowTableFunction(window=[HOP(time_col=[event_time], size=[2 min], slide=[1 
> min])])
>                         +- Union(all=[true], union=[event_time])
>                            :- Union(all=[true], union=[event_time])
>                            :  :- Calc(select=[2024-01-01 18:00:00 AS 
> event_time])
>                            :  :  +- Values(tuples=[[{ 0 }]], 
> values=[ZERO])(reuse_id=[1])
>                            :  +- Calc(select=[2024-01-01 18:05:00 AS 
> event_time])
>                            :     +- Reused(reference_id=[1])
>                            +- Calc(select=[2024-01-01 18:10:00 AS event_time])
>                               +- Reused(reference_id=[1])
>  {code}
>  
> What's interesting is that it doesn't produce incorrect results without 
> unions (using a single row):
> {code:java}
> WITH values_table AS ( 
>     SELECT CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
> ) {code}
> Similarly, this doesn't produce incorrect results as a select from a catalog 
> table.
> As part of this bug, I'd like to add validation that precision is <= 3 in the 
> planner when creating a window strategy. Separately, I'd appreciate help in 
> root causing what might be wrong in the union operator for this particular 
> query given that the non-unioned query produces proper windows.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to