Matt Cuento created FLINK-38866:
-----------------------------------

             Summary: TIMESTAMP_LTZ(6) values with UNION ALL into window TVF 
produce incorrect results in batch mode
                 Key: FLINK-38866
                 URL: https://issues.apache.org/jira/browse/FLINK-38866
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 2.1.1, 2.0.1, 1.19.3
            Reporter: Matt Cuento


Window operators hardcode a precision of 3 by default when extracting the 
timestamp of the defined descriptor. In streaming mode, the descriptor must be 
a watermark attribute. Watermark attributes have validation to ensure precision 
does not exceed 3.

In batch mode, we don't have watermarks, and we lack validation on the 
precision of the descriptor for a small subset of windowed queries. As a 
result, when using timestamp columns with high precision, incorrect results are 
produced.

For example, this query produces incorrect results:

 
{code:java}
SELECT
  *
FROM
  (
    WITH values_table AS (
      SELECT
        CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
      UNION ALL
      SELECT
        CAST('2024-01-01 10:05:00' AS TIMESTAMP_LTZ) AS event_time
      UNION ALL
      SELECT
        CAST('2024-01-01 10:10:00' AS TIMESTAMP_LTZ) AS event_time
    )
    SELECT
      window_start,
      window_end
    FROM
      TABLE(
        HOP(
          TABLE values_table,
          DESCRIPTOR(event_time),
          INTERVAL '1' MINUTES,
          INTERVAL '2' MINUTES
        )
      )
    GROUP BY
      window_start,
      window_end
  ); {code}
 

Expected:
{code:java}
+I[2024-01-01T09:59, 2024-01-01T10:01]
+I[2024-01-01T10:00, 2024-01-01T10:02]
+I[2024-01-01T10:04, 2024-01-01T10:06]
+I[2024-01-01T10:05, 2024-01-01T10:07]
+I[2024-01-01T10:09, 2024-01-01T10:11]
+I[2024-01-01T10:10, 2024-01-01T10:12] {code}
Actual:
{code:java}
+I[1972-03-06T00:43, 1972-03-06T00:45], 
+I[1972-03-06T00:44, 1972-03-06T00:46] {code}
 

Explain:
{code:java}
== Abstract Syntax Tree ==
LogicalProject(window_start=[$0], window_end=[$1])
+- LogicalAggregate(group=[{0, 1}])
   +- LogicalProject(window_start=[$1], window_end=[$2])
      +- LogicalTableFunctionScan(invocation=[HOP(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'event_time'), 60000:INTERVAL MINUTE, 120000:INTERVAL 
MINUTE)], rowType=[RecordType(TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) event_time, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(6) window_time)])
         +- LogicalProject(event_time=[$0])
            +- LogicalUnion(all=[true])
               :- LogicalUnion(all=[true])
               :  :- LogicalProject(event_time=[2024-01-01 
18:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
               :  :  +- LogicalValues(tuples=[[{ 0 }]])
               :  +- LogicalProject(event_time=[2024-01-01 
18:05:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
               :     +- LogicalValues(tuples=[[{ 0 }]])
               +- LogicalProject(event_time=[2024-01-01 
18:10:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)])
                  +- LogicalValues(tuples=[[{ 0 }]])

== Optimized Physical Plan ==
SortAggregate(isMerge=[true], groupBy=[window_start, window_end], 
select=[window_start, window_end])
+- Sort(orderBy=[window_start ASC, window_end ASC])
   +- Exchange(distribution=[hash[window_start, window_end]])
      +- LocalSortAggregate(groupBy=[window_start, window_end], 
select=[window_start, window_end])
         +- Sort(orderBy=[window_start ASC, window_end ASC])
            +- Calc(select=[window_start, window_end])
               +- WindowTableFunction(window=[HOP(time_col=[event_time], 
size=[2 min], slide=[1 min])])
                  +- Union(all=[true], union=[event_time])
                     :- Union(all=[true], union=[event_time])
                     :  :- Calc(select=[2024-01-01 18:00:00 AS event_time])
                     :  :  +- Values(tuples=[[{ 0 }]], values=[ZERO])
                     :  +- Calc(select=[2024-01-01 18:05:00 AS event_time])
                     :     +- Values(tuples=[[{ 0 }]], values=[ZERO])
                     +- Calc(select=[2024-01-01 18:10:00 AS event_time])
                        +- Values(tuples=[[{ 0 }]], values=[ZERO])

== Optimized Execution Plan ==
SortAggregate(isMerge=[true], groupBy=[window_start, window_end], 
select=[window_start, window_end])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[window_start ASC, window_end ASC])
      +- Exchange(distribution=[hash[window_start, window_end]])
         +- LocalSortAggregate(groupBy=[window_start, window_end], 
select=[window_start, window_end])
            +- Exchange(distribution=[forward])
               +- Sort(orderBy=[window_start ASC, window_end ASC])
                  +- Calc(select=[window_start, window_end])
                     +- WindowTableFunction(window=[HOP(time_col=[event_time], 
size=[2 min], slide=[1 min])])
                        +- Union(all=[true], union=[event_time])
                           :- Union(all=[true], union=[event_time])
                           :  :- Calc(select=[2024-01-01 18:00:00 AS 
event_time])
                           :  :  +- Values(tuples=[[{ 0 }]], 
values=[ZERO])(reuse_id=[1])
                           :  +- Calc(select=[2024-01-01 18:05:00 AS 
event_time])
                           :     +- Reused(reference_id=[1])
                           +- Calc(select=[2024-01-01 18:10:00 AS event_time])
                              +- Reused(reference_id=[1])
 {code}
 

What's interesting is that it doesn't produce incorrect results without unions 
(using a single row):
{code:java}
WITH values_table AS ( 
    SELECT CAST('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time
) {code}
Similarly, this doesn't produce incorrect results as a select from a catalog 
table.

As part of this bug, I'd like to add validation that precision is <= 3 in the 
planner when creating a window strategy. Separately, I'd appreciate help in 
root causing what might be wrong in the union operator for this particular 
query given that the non-unioned query produces proper windows.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to