Hi Flink community,
I'm running into an issue where the legacy Group Window syntax returns
correct results but the equivalent TVF-based TUMBLE syntax returns counts
of 0 on the same source table. No errors are thrown.
*Table Definition*
CREATE TABLE event_source (
user_id BIGINT,
event_uuid STRING,
raw_timestamp_ns BIGINT,
event_time AS TO_TIMESTAMP_LTZ(raw_timestamp_ns / 1000000, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = '...',
'format' = 'json'
);
*Legacy Syntax*This works. It returns correct counts.
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM event_source
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE);
*TVF Tumbling Window *
This works. It returns correct counts as 0.
SELECT
user_id,
window_start,
window_end,
COUNT(*) AS cnt
FROM TABLE(
TUMBLE(
TABLE event_source,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
user_id,
window_start,
window_end;
Observations:
- No errors or exceptions are thrown.
- The TVF query produces output rows, but with cnt = 0.
- The legacy syntax produces correct non-zero counts on the same data.
- The event_time column is a computed column of type TIMESTAMP_LTZ(3)
derived from a nanosecond epoch.
- When I inspect the execution plans, the legacy query uses
*GroupWindowAggregate* as a single-phase operator and works correctly.
The TVF query uses the two-phase *LocalWindowAggregate +
GlobalWindowAggregate* path, and this is where the counts come back as
0. The aggregation logic in the two-phase path appears to not be counting
records correctly, despite receiving the same input data.
Environment:
Flink version: 1.18.1
Deployment: Yarn
Since both queries operate on the same table with the same watermark, I
would expect identical results. The fact that the TVF path returns cnt = 0
suggests records may not be getting assigned to windows correctly in the
LocalWindowAggregate phase - possibly related to TIMESTAMP_LTZ handling in
the two-phase (local-global) execution path.
Has anyone encountered similar behavior? Any pointers would be appreciated.
Thanks,
Kanchi Masalia