[ 
https://issues.apache.org/jira/browse/FLINK-38075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kaitian updated FLINK-38075:
----------------------------
    Description: 
nextTriggerWatermark is calculated using ltz time,

 
{code:java}
// code placeholder
public static long getNextTriggerWatermark(
            long currentWatermark,
            long interval,
            long windowOffset,
            ZoneId shiftTimezone,
            boolean useDayLightSaving) {
        if (currentWatermark == Long.MAX_VALUE) {
            return currentWatermark;
        }        long triggerWatermark;
        // consider the DST timezone
        if (useDayLightSaving) {
            long utcWindowStart =
                    getWindowStartWithOffset(
                            toUtcTimestampMills(currentWatermark, 
shiftTimezone),
                            windowOffset,
                            interval);
            triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 
1, shiftTimezone);
        } else {
            long start = getWindowStartWithOffset(currentWatermark, 
windowOffset, interval);
            triggerWatermark = start + interval - 1;
        } {code}
 

while record's sliceEnd is calculated using utc time; (you can simply think of 
both as windowEnd)

 
{code:java}
// code placeholder
public final long assignSliceEnd(RowData element, ClockService clock) {
            final long timestamp;
            if (rowtimeIndex >= 0) {
                if (element.isNullAt(rowtimeIndex)) {
                    throw new RuntimeException(
                            "RowTime field should not be null,"
                                    + " please convert it to a non-null long 
value.");
                }
                // Precision for row timestamp is always 3
                TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
                timestamp = toUtcTimestampMills(rowTime.getMillisecond(), 
shiftTimeZone);
            } else {
                // in processing time mode
                timestamp = toUtcTimestampMills(clock.currentProcessingTime(), 
shiftTimeZone);
            }
            return assignSliceEnd(timestamp);
        } {code}
 


For a time --time1, its windowEnd is calculated using utc time as windowEnd1; 
but ltz time has an offset, and after taking into account the offset, time1 
will be divided into another window, and the calculated value is windowEnd2
So a watermark may appear, setting the nextTriggerWatermark too large, so the 
window is not triggered in time (similar to the offset problem 
https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is 
triggered, the data has expired, and the data is lost

> [Window]Data loss when using ltz time.
> --------------------------------------
>
>                 Key: FLINK-38075
>                 URL: https://issues.apache.org/jira/browse/FLINK-38075
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 2.0.0, 1.20.1, 2.1.0
>            Reporter: kaitian
>            Priority: Major
>              Labels: window
>
> nextTriggerWatermark is calculated using ltz time,
>  
> {code:java}
> // code placeholder
> public static long getNextTriggerWatermark(
>             long currentWatermark,
>             long interval,
>             long windowOffset,
>             ZoneId shiftTimezone,
>             boolean useDayLightSaving) {
>         if (currentWatermark == Long.MAX_VALUE) {
>             return currentWatermark;
>         }        long triggerWatermark;
>         // consider the DST timezone
>         if (useDayLightSaving) {
>             long utcWindowStart =
>                     getWindowStartWithOffset(
>                             toUtcTimestampMills(currentWatermark, 
> shiftTimezone),
>                             windowOffset,
>                             interval);
>             triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval 
> - 1, shiftTimezone);
>         } else {
>             long start = getWindowStartWithOffset(currentWatermark, 
> windowOffset, interval);
>             triggerWatermark = start + interval - 1;
>         } {code}
>  
> while record's sliceEnd is calculated using utc time; (you can simply think 
> of both as windowEnd)
>  
> {code:java}
> // code placeholder
> public final long assignSliceEnd(RowData element, ClockService clock) {
>             final long timestamp;
>             if (rowtimeIndex >= 0) {
>                 if (element.isNullAt(rowtimeIndex)) {
>                     throw new RuntimeException(
>                             "RowTime field should not be null,"
>                                     + " please convert it to a non-null long 
> value.");
>                 }
>                 // Precision for row timestamp is always 3
>                 TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
>                 timestamp = toUtcTimestampMills(rowTime.getMillisecond(), 
> shiftTimeZone);
>             } else {
>                 // in processing time mode
>                 timestamp = 
> toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
>             }
>             return assignSliceEnd(timestamp);
>         } {code}
>  
> For a time --time1, its windowEnd is calculated using utc time as windowEnd1; 
> but ltz time has an offset, and after taking into account the offset, time1 
> will be divided into another window, and the calculated value is windowEnd2
> So a watermark may appear, setting the nextTriggerWatermark too large, so the 
> window is not triggered in time (similar to the offset problem 
> https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is 
> triggered, the data has expired, and the data is lost



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to