Fred Teunissen created FLINK-39526:
--------------------------------------

             Summary: Ever-growing checkpoint size in CROSS JOIN with inline 
VALUE clause causes checkpointing failure
                 Key: FLINK-39526
                 URL: https://issues.apache.org/jira/browse/FLINK-39526
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 2.1.1, 2.2.0, 1.20.3
            Reporter: Fred Teunissen
         Attachments: Screenshot-dashboard.png, config.yaml, 
flink--sql-client-4e3aa6faf0c2.log, 
flink--standalonesession-0-f033734767f0.log, 
flink--taskexecutor-0-59d69a5646c2.log, init.sql, job.sql

When executing a Flink SQL job that performs a CROSS JOIN between a streaming 
table and an inline VALUES clause (containing ROW-typed data), checkpoint size 
grows monotonically with each checkpoint. After approximately 16 checkpoints, 
the checkpoint fails due to memory/size constraints, causing the entire job to 
fail.

*Steps to Reproduce:*
Create the input source table and output sink table:
{code:sql}
CREATE TABLE input (
    id    STRING,
    ingest_time  TIMESTAMP(3),
    WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '500',
    'number-of-rows' = '100000'
);

CREATE TABLE blackhole_table (
    id STRING,
    column1 ROW<id STRING, version STRING, description STRING>,
    ingest_time TIMESTAMP(3)
) WITH (
    'connector' = 'blackhole'
);
{code}
Execute the job that performs a CROSS JOIN with an inline VALUES clause:
{code:sql}
CREATE VIEW inlineData AS
SELECT
    cfg
FROM (VALUES
    CAST(ROW('id1', '1.0.0', 'description1') AS ROW<id STRING, version STRING, 
description STRING>),
    CAST(ROW('id2', '1.1.0', 'description1.1') AS ROW<id STRING, version 
STRING, description STRING>),
    CAST(ROW('id3', '2.0.0', 'description2') AS ROW<id STRING, version STRING, 
description STRING>)
) AS t(cfg);

INSERT INTO blackhole_table
SELECT
    input.id,
    cfg,
    ingest_time
FROM input
CROSS JOIN inlineData;
{code}
Configure the job with checkpointing enabled (5-second interval):
 - Enable checkpointing via configuration
 - Set checkpoint interval to 5 seconds
 - Set process memory for both JobManager and TaskManager to 2048m

Each checkpoint increases in size. After ~16 checkpoints (approximately 80 
seconds), checkpointing fails and the job terminates.

*Expected Behavior*
Checkpoint size should stabilize after initial state accumulation. The inline 
VALUES clause is static and should not cause checkpoint size to grow with each 
checkpoint cycle.

{*}Actual Behavior{*}:
Checkpoint size grows linearly with each checkpoint until the job fails. This 
suggests state is being accumulated inappropriately, possibly due to improper 
state cleanup or incorrect handling of the inline VALUES operator in cross join 
scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to