[
https://issues.apache.org/jira/browse/FLINK-38075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
kaitian updated FLINK-38075:
----------------------------
Description:
nextTriggerWatermark is calculated using ltz time,
{code:java}
// code placeholder
public static long getNextTriggerWatermark(
long currentWatermark,
long interval,
long windowOffset,
ZoneId shiftTimezone,
boolean useDayLightSaving) {
if (currentWatermark == Long.MAX_VALUE) {
return currentWatermark;
} long triggerWatermark;
// consider the DST timezone
if (useDayLightSaving) {
long utcWindowStart =
getWindowStartWithOffset(
toUtcTimestampMills(currentWatermark,
shiftTimezone),
windowOffset,
interval);
triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval -
1, shiftTimezone);
} else {
long start = getWindowStartWithOffset(currentWatermark,
windowOffset, interval);
triggerWatermark = start + interval - 1;
} {code}
while record's sliceEnd is calculated using utc time; (you can simply think of
both as windowEnd)
{code:java}
// code placeholder
public final long assignSliceEnd(RowData element, ClockService clock) {
final long timestamp;
if (rowtimeIndex >= 0) {
if (element.isNullAt(rowtimeIndex)) {
throw new RuntimeException(
"RowTime field should not be null,"
+ " please convert it to a non-null long
value.");
}
// Precision for row timestamp is always 3
TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
timestamp = toUtcTimestampMills(rowTime.getMillisecond(),
shiftTimeZone);
} else {
// in processing time mode
timestamp = toUtcTimestampMills(clock.currentProcessingTime(),
shiftTimeZone);
}
return assignSliceEnd(timestamp);
} {code}
For a time --time1, its windowEnd is calculated using utc time as windowEnd1;
but ltz time has an offset, and after taking into account the offset, time1
will be divided into another window, and the calculated value is windowEnd2
So a watermark may appear, setting the nextTriggerWatermark too large, so the
window is not triggered in time (similar to the offset problem
https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is
triggered, the data has expired, and the data is lost
I added a sql test to reproduce this problem:
input:
row("2020-10-10 00:07:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
row("2020-10-10 00:07:59", 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1",
"a"),
row("2020-10-10 11:11:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
row("2020-10-10 11:11:59", 2, 2d, 2f, new JBigDecimal("2.22"), "Comment#1",
"a"),
sql:
s"""
|CREATE TABLE T3 (
| `ts` ${if (useTimestampLtz) "BIGINT" else "STRING"},
| `int` INT,
| `double` DOUBLE,
| `float` FLOAT,
| `bigdec` DECIMAL(10, 2),
| `string` STRING,
| `name` STRING,
| `rowtime` AS
| ${if (useTimestampLtz) "TO_TIMESTAMP_LTZ(`ts`, 3)" else
"TO_TIMESTAMP(`ts`)"},
| WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$insertOnlyOffsetDataId',
| 'failing-source' = 'false'
|)
|""".stripMargin
s"""
|SELECT
| `name`
| ,window_start
| ,window_end
| ,COUNT(*)
| ,SUM(`bigdec`)
| ,MAX(`double`)
| ,MIN(`float`)
| -- agg function with data view does not support async state yet
| ${if (!enableAsyncState) aggFunctionsWithDataView else ""}
|FROM TABLE($tvfFromClause)
|GROUP BY `name`, window_start, window_end
|""".stripMargin
expect:
"a,2020-10-09T08:00,2020-10-10T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
"a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
<utc> pass the test, but <ltz> only get:
"a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
was:
nextTriggerWatermark is calculated using ltz time,
{code:java}
// code placeholder
public static long getNextTriggerWatermark(
long currentWatermark,
long interval,
long windowOffset,
ZoneId shiftTimezone,
boolean useDayLightSaving) {
if (currentWatermark == Long.MAX_VALUE) {
return currentWatermark;
} long triggerWatermark;
// consider the DST timezone
if (useDayLightSaving) {
long utcWindowStart =
getWindowStartWithOffset(
toUtcTimestampMills(currentWatermark,
shiftTimezone),
windowOffset,
interval);
triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval -
1, shiftTimezone);
} else {
long start = getWindowStartWithOffset(currentWatermark,
windowOffset, interval);
triggerWatermark = start + interval - 1;
} {code}
while record's sliceEnd is calculated using utc time; (you can simply think of
both as windowEnd)
{code:java}
// code placeholder
public final long assignSliceEnd(RowData element, ClockService clock) {
final long timestamp;
if (rowtimeIndex >= 0) {
if (element.isNullAt(rowtimeIndex)) {
throw new RuntimeException(
"RowTime field should not be null,"
+ " please convert it to a non-null long
value.");
}
// Precision for row timestamp is always 3
TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
timestamp = toUtcTimestampMills(rowTime.getMillisecond(),
shiftTimeZone);
} else {
// in processing time mode
timestamp = toUtcTimestampMills(clock.currentProcessingTime(),
shiftTimeZone);
}
return assignSliceEnd(timestamp);
} {code}
For a time --time1, its windowEnd is calculated using utc time as windowEnd1;
but ltz time has an offset, and after taking into account the offset, time1
will be divided into another window, and the calculated value is windowEnd2
So a watermark may appear, setting the nextTriggerWatermark too large, so the
window is not triggered in time (similar to the offset problem
https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is
triggered, the data has expired, and the data is lost
> [Window]Data loss when using ltz time.
> --------------------------------------
>
> Key: FLINK-38075
> URL: https://issues.apache.org/jira/browse/FLINK-38075
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.0.0, 1.20.1, 2.1.0
> Reporter: kaitian
> Priority: Major
> Labels: window
>
> nextTriggerWatermark is calculated using ltz time,
>
> {code:java}
> // code placeholder
> public static long getNextTriggerWatermark(
> long currentWatermark,
> long interval,
> long windowOffset,
> ZoneId shiftTimezone,
> boolean useDayLightSaving) {
> if (currentWatermark == Long.MAX_VALUE) {
> return currentWatermark;
> } long triggerWatermark;
> // consider the DST timezone
> if (useDayLightSaving) {
> long utcWindowStart =
> getWindowStartWithOffset(
> toUtcTimestampMills(currentWatermark,
> shiftTimezone),
> windowOffset,
> interval);
> triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval
> - 1, shiftTimezone);
> } else {
> long start = getWindowStartWithOffset(currentWatermark,
> windowOffset, interval);
> triggerWatermark = start + interval - 1;
> } {code}
>
> while record's sliceEnd is calculated using utc time; (you can simply think
> of both as windowEnd)
>
> {code:java}
> // code placeholder
> public final long assignSliceEnd(RowData element, ClockService clock) {
> final long timestamp;
> if (rowtimeIndex >= 0) {
> if (element.isNullAt(rowtimeIndex)) {
> throw new RuntimeException(
> "RowTime field should not be null,"
> + " please convert it to a non-null long
> value.");
> }
> // Precision for row timestamp is always 3
> TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
> timestamp = toUtcTimestampMills(rowTime.getMillisecond(),
> shiftTimeZone);
> } else {
> // in processing time mode
> timestamp =
> toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
> }
> return assignSliceEnd(timestamp);
> } {code}
>
> For a time --time1, its windowEnd is calculated using utc time as windowEnd1;
> but ltz time has an offset, and after taking into account the offset, time1
> will be divided into another window, and the calculated value is windowEnd2
> So a watermark may appear, setting the nextTriggerWatermark too large, so the
> window is not triggered in time (similar to the offset problem
> https://issues.apache.org/jira/browse/FLINK-36664). Later, when the window is
> triggered, the data has expired, and the data is lost
> I added a sql test to reproduce this problem:
> input:
> row("2020-10-10 00:07:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
> row("2020-10-10 00:07:59", 2, 2d, 2f, new JBigDecimal("2.22"),
> "Comment#1", "a"),
> row("2020-10-10 11:11:59", 1, 1d, 1f, new JBigDecimal("1.11"), "Hi", "a"),
> row("2020-10-10 11:11:59", 2, 2d, 2f, new JBigDecimal("2.22"),
> "Comment#1", "a"),
> sql:
> s"""
> |CREATE TABLE T3 (
> | `ts` ${if (useTimestampLtz) "BIGINT" else "STRING"},
> | `int` INT,
> | `double` DOUBLE,
> | `float` FLOAT,
> | `bigdec` DECIMAL(10, 2),
> | `string` STRING,
> | `name` STRING,
> | `rowtime` AS
> | ${if (useTimestampLtz) "TO_TIMESTAMP_LTZ(`ts`, 3)" else
> "TO_TIMESTAMP(`ts`)"},
> | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
> |) WITH (
> | 'connector' = 'values',
> | 'data-id' = '$insertOnlyOffsetDataId',
> | 'failing-source' = 'false'
> |)
> |""".stripMargin
>
> s"""
> |SELECT
> | `name`
> | ,window_start
> | ,window_end
> | ,COUNT(*)
> | ,SUM(`bigdec`)
> | ,MAX(`double`)
> | ,MIN(`float`)
> | -- agg function with data view does not support async state yet
> | ${if (!enableAsyncState) aggFunctionsWithDataView else ""}
> |FROM TABLE($tvfFromClause)
> |GROUP BY `name`, window_start, window_end
> |""".stripMargin
>
> expect:
> "a,2020-10-09T08:00,2020-10-10T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
> "a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
>
> <utc> pass the test, but <ltz> only get:
> "a,2020-10-10T08:00,2020-10-11T08:00,2,3.33,2.0,1.0,2,Hi|Comment#1",
--
This message was sent by Atlassian Jira
(v8.20.10#820010)