Kanchi  Masalia created FLINK-39589:
---------------------------------------

             Summary: Window TVF aggregation emits incorrect results with 
RocksDB state backend
                 Key: FLINK-39589
                 URL: https://issues.apache.org/jira/browse/FLINK-39589
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.18.1
            Reporter: Kanchi  Masalia


Window TVF aggregation (TUMBLE/HOP/CUMULATE) silently produces incorrect 
results when using RocksDB state backend. Per window, only one key emits 
the correct aggregation - all other keys emit with count=0 and the wrong 
key. The legacy GROUP BY TUMBLE() syntax works correctly with the same 
backend and data.
h3. Reproduction

Using datagen connector with 5 distinct keys:
{code:sql}
CREATE TEMPORARY TABLE datagen_test (
  event_userid BIGINT,
  ts BIGINT,
  event_time AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '100000',
  'fields.event_userid.min' = '1',
  'fields.event_userid.max' = '5',
  'fields.ts.kind' = 'sequence',
  'fields.ts.start' = '1771345700000',
  'fields.ts.end'   = '1771346600000'
);

-- Broken (TVF syntax + RocksDB):
SELECT event_userid, window_start, window_end, COUNT(*) AS cnt
FROM TABLE(TUMBLE(TABLE datagen_test, DESCRIPTOR(event_time), INTERVAL '1' 
MINUTE))
GROUP BY event_userid, window_start, window_end;

-- Works (legacy syntax + RocksDB):
SELECT event_userid,
       TUMBLE_START(event_time, INTERVAL '1' MINUTE),
       TUMBLE_END(event_time, INTERVAL '1' MINUTE),
       COUNT(*) AS cnt
FROM datagen_test
GROUP BY event_userid, TUMBLE(event_time, INTERVAL '1' MINUTE);
{code}
TVF output shows all rows emitting with the same key and most with cnt=0. 
Legacy output shows 5 distinct keys with correct counts (~12,000 each).

Switching to hashmap backend resolves the issue:
{code:sql}
SET 'state.backend.type' = 'hashmap';
{code}
h3. Root Cause

RecordsWindowBuffer.flush() reuses the same WindowKey object when 
requiresCopy=false (RocksDB path). These mutable key references are 
captured by TimerHeapInternalTimer, so when timers fire, all timers 
resolve to the last key — reading and clearing the same state repeatedly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to