Kanchi Masalia created FLINK-39589:
---------------------------------------
Summary: Window TVF aggregation emits incorrect results with
RocksDB state backend
Key: FLINK-39589
URL: https://issues.apache.org/jira/browse/FLINK-39589
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.18.1
Reporter: Kanchi Masalia
Window TVF aggregation (TUMBLE/HOP/CUMULATE) silently produces incorrect
results when using RocksDB state backend. Per window, only one key emits
the correct aggregation - all other keys emit with count=0 and the wrong
key. The legacy GROUP BY TUMBLE() syntax works correctly with the same
backend and data.
h3. Reproduction
Using datagen connector with 5 distinct keys:
{code:sql}
CREATE TEMPORARY TABLE datagen_test (
event_userid BIGINT,
ts BIGINT,
event_time AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100000',
'fields.event_userid.min' = '1',
'fields.event_userid.max' = '5',
'fields.ts.kind' = 'sequence',
'fields.ts.start' = '1771345700000',
'fields.ts.end' = '1771346600000'
);
-- Broken (TVF syntax + RocksDB):
SELECT event_userid, window_start, window_end, COUNT(*) AS cnt
FROM TABLE(TUMBLE(TABLE datagen_test, DESCRIPTOR(event_time), INTERVAL '1'
MINUTE))
GROUP BY event_userid, window_start, window_end;
-- Works (legacy syntax + RocksDB):
SELECT event_userid,
TUMBLE_START(event_time, INTERVAL '1' MINUTE),
TUMBLE_END(event_time, INTERVAL '1' MINUTE),
COUNT(*) AS cnt
FROM datagen_test
GROUP BY event_userid, TUMBLE(event_time, INTERVAL '1' MINUTE);
{code}
TVF output shows all rows emitting with the same key and most with cnt=0.
Legacy output shows 5 distinct keys with correct counts (~12,000 each).
Switching to hashmap backend resolves the issue:
{code:sql}
SET 'state.backend.type' = 'hashmap';
{code}
h3. Root Cause
RecordsWindowBuffer.flush() reuses the same WindowKey object when
requiresCopy=false (RocksDB path). These mutable key references are
captured by TimerHeapInternalTimer, so when timers fire, all timers
resolve to the last key — reading and clearing the same state repeatedly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)