[
https://issues.apache.org/jira/browse/FLINK-39526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske updated FLINK-39526:
----------------------------------
Summary: Support CROSS JOIN with inline VALUE clause with static state size
(was: Ever-growing checkpoint size in CROSS JOIN with inline VALUE clause
causes checkpointing failure)
> Support CROSS JOIN with inline VALUE clause with static state size
> ------------------------------------------------------------------
>
> Key: FLINK-39526
> URL: https://issues.apache.org/jira/browse/FLINK-39526
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Runtime
> Affects Versions: 1.20.3, 2.2.0, 2.1.1
> Reporter: Fred Teunissen
> Priority: Major
> Attachments: Screenshot-dashboard.png, config.yaml,
> flink--sql-client-4e3aa6faf0c2.log,
> flink--standalonesession-0-f033734767f0.log,
> flink--taskexecutor-0-59d69a5646c2.log, init.sql, job.sql
>
>
> When executing a Flink SQL job that performs a CROSS JOIN between a streaming
> table and an inline VALUES clause (containing ROW-typed data), checkpoint
> size grows monotonically with each checkpoint. After approximately 16
> checkpoints, the checkpoint fails due to memory/size constraints, causing the
> entire job to fail.
> *Steps to Reproduce:*
> Create the input source table and output sink table:
> {code:sql}
> CREATE TABLE input (
> id STRING,
> ingest_time TIMESTAMP(3),
> WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '500',
> 'number-of-rows' = '100000'
> );
> CREATE TABLE blackhole_table (
> id STRING,
> column1 ROW<id STRING, version STRING, description STRING>,
> ingest_time TIMESTAMP(3)
> ) WITH (
> 'connector' = 'blackhole'
> );
> {code}
> Execute the job that performs a CROSS JOIN with an inline VALUES clause:
> {code:sql}
> CREATE VIEW inlineData AS
> SELECT
> cfg
> FROM (VALUES
> CAST(ROW('id1', '1.0.0', 'description1') AS ROW<id STRING, version
> STRING, description STRING>),
> CAST(ROW('id2', '1.1.0', 'description1.1') AS ROW<id STRING, version
> STRING, description STRING>),
> CAST(ROW('id3', '2.0.0', 'description2') AS ROW<id STRING, version
> STRING, description STRING>)
> ) AS t(cfg);
> INSERT INTO blackhole_table
> SELECT
> input.id,
> cfg,
> ingest_time
> FROM input
> CROSS JOIN inlineData;
> {code}
> Configure the job with checkpointing enabled (5-second interval):
> - Enable checkpointing via configuration
> - Set checkpoint interval to 5 seconds
> - Set process memory for both JobManager and TaskManager to 2048m
> Each checkpoint increases in size. After ~16 checkpoints (approximately 80
> seconds), checkpointing fails and the job terminates.
> *Expected Behavior*
> Checkpoint size should stabilize after initial state accumulation. The inline
> VALUES clause is static and should not cause checkpoint size to grow with
> each checkpoint cycle.
> {*}Actual Behavior{*}:
> Checkpoint size grows linearly with each checkpoint until the job fails. This
> suggests state is being accumulated inappropriately, possibly due to improper
> state cleanup or incorrect handling of the inline VALUES operator in cross
> join scenarios.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)