Fred Teunissen created FLINK-39526:
--------------------------------------
Summary: Ever-growing checkpoint size in CROSS JOIN with inline
VALUE clause causes checkpointing failure
Key: FLINK-39526
URL: https://issues.apache.org/jira/browse/FLINK-39526
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 2.1.1, 2.2.0, 1.20.3
Reporter: Fred Teunissen
Attachments: Screenshot-dashboard.png, config.yaml,
flink--sql-client-4e3aa6faf0c2.log,
flink--standalonesession-0-f033734767f0.log,
flink--taskexecutor-0-59d69a5646c2.log, init.sql, job.sql
When executing a Flink SQL job that performs a CROSS JOIN between a streaming
table and an inline VALUES clause (containing ROW-typed data), checkpoint size
grows monotonically with each checkpoint. After approximately 16 checkpoints,
the checkpoint fails due to memory/size constraints, causing the entire job to
fail.
*Steps to Reproduce:*
Create the input source table and output sink table:
{code:sql}
CREATE TABLE input (
id STRING,
ingest_time TIMESTAMP(3),
WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '500',
'number-of-rows' = '100000'
);
CREATE TABLE blackhole_table (
id STRING,
column1 ROW<id STRING, version STRING, description STRING>,
ingest_time TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
{code}
Execute the job that performs a CROSS JOIN with an inline VALUES clause:
{code:sql}
CREATE VIEW inlineData AS
SELECT
cfg
FROM (VALUES
CAST(ROW('id1', '1.0.0', 'description1') AS ROW<id STRING, version STRING,
description STRING>),
CAST(ROW('id2', '1.1.0', 'description1.1') AS ROW<id STRING, version
STRING, description STRING>),
CAST(ROW('id3', '2.0.0', 'description2') AS ROW<id STRING, version STRING,
description STRING>)
) AS t(cfg);
INSERT INTO blackhole_table
SELECT
input.id,
cfg,
ingest_time
FROM input
CROSS JOIN inlineData;
{code}
Configure the job with checkpointing enabled (5-second interval):
- Enable checkpointing via configuration
- Set checkpoint interval to 5 seconds
- Set process memory for both JobManager and TaskManager to 2048m
Each checkpoint increases in size. After ~16 checkpoints (approximately 80
seconds), checkpointing fails and the job terminates.
*Expected Behavior*
Checkpoint size should stabilize after initial state accumulation. The inline
VALUES clause is static and should not cause checkpoint size to grow with each
checkpoint cycle.
{*}Actual Behavior{*}:
Checkpoint size grows linearly with each checkpoint until the job fails. This
suggests state is being accumulated inappropriately, possibly due to improper
state cleanup or incorrect handling of the inline VALUES operator in cross join
scenarios.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)