[ 
https://issues.apache.org/jira/browse/FLINK-39526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-39526:
----------------------------------
    Summary: Support CROSS JOIN with inline VALUE clause with static state size 
 (was: Ever-growing checkpoint size in CROSS JOIN with inline VALUE clause 
causes checkpointing failure)

> Support CROSS JOIN with inline VALUE clause with static state size
> ------------------------------------------------------------------
>
>                 Key: FLINK-39526
>                 URL: https://issues.apache.org/jira/browse/FLINK-39526
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / Runtime
>    Affects Versions: 1.20.3, 2.2.0, 2.1.1
>            Reporter: Fred Teunissen
>            Priority: Major
>         Attachments: Screenshot-dashboard.png, config.yaml, 
> flink--sql-client-4e3aa6faf0c2.log, 
> flink--standalonesession-0-f033734767f0.log, 
> flink--taskexecutor-0-59d69a5646c2.log, init.sql, job.sql
>
>
> When executing a Flink SQL job that performs a CROSS JOIN between a streaming 
> table and an inline VALUES clause (containing ROW-typed data), checkpoint 
> size grows monotonically with each checkpoint. After approximately 16 
> checkpoints, the checkpoint fails due to memory/size constraints, causing the 
> entire job to fail.
> *Steps to Reproduce:*
> Create the input source table and output sink table:
> {code:sql}
> CREATE TABLE input (
>     id    STRING,
>     ingest_time  TIMESTAMP(3),
>     WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '500',
>     'number-of-rows' = '100000'
> );
> CREATE TABLE blackhole_table (
>     id STRING,
>     column1 ROW<id STRING, version STRING, description STRING>,
>     ingest_time TIMESTAMP(3)
> ) WITH (
>     'connector' = 'blackhole'
> );
> {code}
> Execute the job that performs a CROSS JOIN with an inline VALUES clause:
> {code:sql}
> CREATE VIEW inlineData AS
> SELECT
>     cfg
> FROM (VALUES
>     CAST(ROW('id1', '1.0.0', 'description1') AS ROW<id STRING, version 
> STRING, description STRING>),
>     CAST(ROW('id2', '1.1.0', 'description1.1') AS ROW<id STRING, version 
> STRING, description STRING>),
>     CAST(ROW('id3', '2.0.0', 'description2') AS ROW<id STRING, version 
> STRING, description STRING>)
> ) AS t(cfg);
> INSERT INTO blackhole_table
> SELECT
>     input.id,
>     cfg,
>     ingest_time
> FROM input
> CROSS JOIN inlineData;
> {code}
> Configure the job with checkpointing enabled (5-second interval):
>  - Enable checkpointing via configuration
>  - Set checkpoint interval to 5 seconds
>  - Set process memory for both JobManager and TaskManager to 2048m
> Each checkpoint increases in size. After ~16 checkpoints (approximately 80 
> seconds), checkpointing fails and the job terminates.
> *Expected Behavior*
> Checkpoint size should stabilize after initial state accumulation. The inline 
> VALUES clause is static and should not cause checkpoint size to grow with 
> each checkpoint cycle.
> {*}Actual Behavior{*}:
> Checkpoint size grows linearly with each checkpoint until the job fails. This 
> suggests state is being accumulated inappropriately, possibly due to improper 
> state cleanup or incorrect handling of the inline VALUES operator in cross 
> join scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to