[ 
https://issues.apache.org/jira/browse/FLINK-38075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kaitian updated FLINK-38075:
----------------------------
    Description: 
nextTriggerWatermark is calculated using ltz time,

 
{code:java}
// code placeholder
public static long getNextTriggerWatermark(
            long currentWatermark,
            long interval,
            long windowOffset,
            ZoneId shiftTimezone,
            boolean useDayLightSaving) {
        if (currentWatermark == Long.MAX_VALUE) {
            return currentWatermark;
        }        long triggerWatermark;
        // consider the DST timezone
        if (useDayLightSaving) {
            long utcWindowStart =
                    getWindowStartWithOffset(
                            toUtcTimestampMills(currentWatermark, 
shiftTimezone),
                            windowOffset,
                            interval);
            triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 
1, shiftTimezone);
        } else {
            long start = getWindowStartWithOffset(currentWatermark, 
windowOffset, interval);
            triggerWatermark = start + interval - 1;
        } {code}
 

while record's sliceEnd is calculated using utc time; (you can simply think of 
both as windowEnd)

 
{code:java}
// code placeholder
public final long assignSliceEnd(RowData element, ClockService clock) {
            final long timestamp;
            if (rowtimeIndex >= 0) {
                if (element.isNullAt(rowtimeIndex)) {
                    throw new RuntimeException(
                            "RowTime field should not be null,"
                                    + " please convert it to a non-null long 
value.");
                }
                // Precision for row timestamp is always 3
                TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
                timestamp = toUtcTimestampMills(rowTime.getMillisecond(), 
shiftTimeZone);
            } else {
                // in processing time mode
                timestamp = toUtcTimestampMills(clock.currentProcessingTime(), 
shiftTimeZone);
            }
            return assignSliceEnd(timestamp);
        } {code}
 

For a time --time1, its windowEnd is calculated using utc time as windowEnd1; 
but ltz time has an offset, and after taking into account the offset, time1 
will be divided into another window, and the calculated value is windowEnd2
So a watermark may appear, setting the nextTriggerWatermark too large, so the 
window is not triggered in time (similar to the offset problem 
https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is 
triggered, the data has expired, and the data is lost

I added a sql test to reproduce this problem:

input:

    row("2020-10-10 00:07:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
    row("2020-10-10 00:07:59", 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1", 
"a"),
    row("2020-10-10 11:11:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
    row("2020-10-10 11:11:59", 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1", 
"a"),

sql:

s"""
         |CREATE TABLE T3 (
         | `ts` ${if (useTimestampLtz) "BIGINT" else "STRING"},
         | `int` INT,
         | `double` DOUBLE,
         | `float` FLOAT,
         | `bigdec` DECIMAL(10, 2),
         | `string` STRING,
         | `name` STRING,
         | `rowtime` AS
         | ${if (useTimestampLtz) "TO_TIMESTAMP_LTZ(`ts`, 3)" else 
"TO_TIMESTAMP(`ts`)"},
         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
         |) WITH (
         | 'connector' = 'values',
         | 'data-id' = '$insertOnlyOffsetDataId',
         | 'failing-source' = 'false'
         |)
         |""".stripMargin

 

s"""
         |SELECT
         |  `name`
         |  ,window_start
         |  ,window_end
         |  ,COUNT(*)
         |  ,SUM(`bigdec`)
         |  ,MAX(`double`)
         |  ,MIN(`float`)
         |  -- agg function with data view does not support async state yet
         |  ${if (!enableAsyncState) aggFunctionsWithDataView else ""}
         |FROM TABLE($tvfFromClause)
         |GROUP BY `name`, window_start, window_end
         |""".stripMargin

 

expect:

"a,2020-10-09T08:00,2020-10-10T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
"a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",

 

<utc> pass the test, but <ltz> only get:
"a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",

  was:
nextTriggerWatermark is calculated using ltz time,

 
{code:java}
// code placeholder
public static long getNextTriggerWatermark(
            long currentWatermark,
            long interval,
            long windowOffset,
            ZoneId shiftTimezone,
            boolean useDayLightSaving) {
        if (currentWatermark == Long.MAX_VALUE) {
            return currentWatermark;
        }        long triggerWatermark;
        // consider the DST timezone
        if (useDayLightSaving) {
            long utcWindowStart =
                    getWindowStartWithOffset(
                            toUtcTimestampMills(currentWatermark, 
shiftTimezone),
                            windowOffset,
                            interval);
            triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 
1, shiftTimezone);
        } else {
            long start = getWindowStartWithOffset(currentWatermark, 
windowOffset, interval);
            triggerWatermark = start + interval - 1;
        } {code}
 

while record's sliceEnd is calculated using utc time; (you can simply think of 
both as windowEnd)

 
{code:java}
// code placeholder
public final long assignSliceEnd(RowData element, ClockService clock) {
            final long timestamp;
            if (rowtimeIndex >= 0) {
                if (element.isNullAt(rowtimeIndex)) {
                    throw new RuntimeException(
                            "RowTime field should not be null,"
                                    + " please convert it to a non-null long 
value.");
                }
                // Precision for row timestamp is always 3
                TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
                timestamp = toUtcTimestampMills(rowTime.getMillisecond(), 
shiftTimeZone);
            } else {
                // in processing time mode
                timestamp = toUtcTimestampMills(clock.currentProcessingTime(), 
shiftTimeZone);
            }
            return assignSliceEnd(timestamp);
        } {code}
 


For a time --time1, its windowEnd is calculated using utc time as windowEnd1; 
but ltz time has an offset, and after taking into account the offset, time1 
will be divided into another window, and the calculated value is windowEnd2
So a watermark may appear, setting the nextTriggerWatermark too large, so the 
window is not triggered in time (similar to the offset problem 
https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is 
triggered, the data has expired, and the data is lost


> [Window]Data loss when using ltz time.
> --------------------------------------
>
>                 Key: FLINK-38075
>                 URL: https://issues.apache.org/jira/browse/FLINK-38075
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.0.0, 1.20.1, 2.1.0
>            Reporter: kaitian
>            Priority: Major
>              Labels: window
>
> nextTriggerWatermark is calculated using ltz time,
>  
> {code:java}
> // code placeholder
> public static long getNextTriggerWatermark(
>             long currentWatermark,
>             long interval,
>             long windowOffset,
>             ZoneId shiftTimezone,
>             boolean useDayLightSaving) {
>         if (currentWatermark == Long.MAX_VALUE) {
>             return currentWatermark;
>         }        long triggerWatermark;
>         // consider the DST timezone
>         if (useDayLightSaving) {
>             long utcWindowStart =
>                     getWindowStartWithOffset(
>                             toUtcTimestampMills(currentWatermark, 
> shiftTimezone),
>                             windowOffset,
>                             interval);
>             triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval 
> - 1, shiftTimezone);
>         } else {
>             long start = getWindowStartWithOffset(currentWatermark, 
> windowOffset, interval);
>             triggerWatermark = start + interval - 1;
>         } {code}
>  
> while record's sliceEnd is calculated using utc time; (you can simply think 
> of both as windowEnd)
>  
> {code:java}
> // code placeholder
> public final long assignSliceEnd(RowData element, ClockService clock) {
>             final long timestamp;
>             if (rowtimeIndex >= 0) {
>                 if (element.isNullAt(rowtimeIndex)) {
>                     throw new RuntimeException(
>                             "RowTime field should not be null,"
>                                     + " please convert it to a non-null long 
> value.");
>                 }
>                 // Precision for row timestamp is always 3
>                 TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
>                 timestamp = toUtcTimestampMills(rowTime.getMillisecond(), 
> shiftTimeZone);
>             } else {
>                 // in processing time mode
>                 timestamp = 
> toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
>             }
>             return assignSliceEnd(timestamp);
>         } {code}
>  
> For a time --time1, its windowEnd is calculated using utc time as windowEnd1; 
> but ltz time has an offset, and after taking into account the offset, time1 
> will be divided into another window, and the calculated value is windowEnd2
> So a watermark may appear, setting the nextTriggerWatermark too large, so the 
> window is not triggered in time (similar to the offset problem 
> https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is 
> triggered, the data has expired, and the data is lost
> I added a sql test to reproduce this problem:
> input:
>     row("2020-10-10 00:07:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
>     row("2020-10-10 00:07:59", 2, 2d, 2f, new JBigDecimal("2.22"), 
> "Comment#1", "a"),
>     row("2020-10-10 11:11:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
>     row("2020-10-10 11:11:59", 2, 2d, 2f, new JBigDecimal("2.22"), 
> "Comment#1", "a"),
> sql:
> s"""
>          |CREATE TABLE T3 (
>          | `ts` ${if (useTimestampLtz) "BIGINT" else "STRING"},
>          | `int` INT,
>          | `double` DOUBLE,
>          | `float` FLOAT,
>          | `bigdec` DECIMAL(10, 2),
>          | `string` STRING,
>          | `name` STRING,
>          | `rowtime` AS
>          | ${if (useTimestampLtz) "TO_TIMESTAMP_LTZ(`ts`, 3)" else 
> "TO_TIMESTAMP(`ts`)"},
>          | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
>          |) WITH (
>          | 'connector' = 'values',
>          | 'data-id' = '$insertOnlyOffsetDataId',
>          | 'failing-source' = 'false'
>          |)
>          |""".stripMargin
>  
> s"""
>          |SELECT
>          |  `name`
>          |  ,window_start
>          |  ,window_end
>          |  ,COUNT(*)
>          |  ,SUM(`bigdec`)
>          |  ,MAX(`double`)
>          |  ,MIN(`float`)
>          |  -- agg function with data view does not support async state yet
>          |  ${if (!enableAsyncState) aggFunctionsWithDataView else ""}
>          |FROM TABLE($tvfFromClause)
>          |GROUP BY `name`, window_start, window_end
>          |""".stripMargin
>  
> expect:
> "a,2020-10-09T08:00,2020-10-10T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
> "a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
>  
> <utc> pass the test, but <ltz> only get:
> "a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to