[ 
https://issues.apache.org/jira/browse/FLINK-39526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077020#comment-18077020
 ] 

Fabian Hueske commented on FLINK-39526:
---------------------------------------

Thanks for raising this problem and providing the detailed description 
[~FredTing]!

The problem is that the Flink optimizer does not consider the finite size / 
boundedness of the VALUES input.
The optimizer implements the join using the [regular streaming join 
operator|https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#regular-joins]
 which fully persists both inputs. Because the unbounded / streaming input is 
also persisted in state, this leads to the growing state size.

Given Flink's feature set, this behavior is in-spec. It's obvious that growing 
state is unexpected behavior for a query that joins an unbounded table against 
a static input (VALUES clause).

It is certainly possible to implement an operator that only persists the static 
VALUES input in state and therefore has a static state footprint.
However, fully addressing this problem is "surprisingly" complex, because 
Flink's operators cannot not prioritize the consumption of inputs. A suitable 
join operator would need to buffer the streaming (probe-side) input until the 
VALUES (build-side) input is complete and the operator also would need to 
notice when the build-side input is complete (can be done via watermarks).
Moreover, an optimizer rule needs to identify that one input of the join is a 
static VALUES input and choose the join implementation described above instead 
of the regular streaming join operator.

Given that the behavior is expected given the current feature set, I'll convert 
this ticket into a feature request.

> Ever-growing checkpoint size in CROSS JOIN with inline VALUE clause causes 
> checkpointing failure
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39526
>                 URL: https://issues.apache.org/jira/browse/FLINK-39526
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.20.3, 2.2.0, 2.1.1
>            Reporter: Fred Teunissen
>            Priority: Major
>         Attachments: Screenshot-dashboard.png, config.yaml, 
> flink--sql-client-4e3aa6faf0c2.log, 
> flink--standalonesession-0-f033734767f0.log, 
> flink--taskexecutor-0-59d69a5646c2.log, init.sql, job.sql
>
>
> When executing a Flink SQL job that performs a CROSS JOIN between a streaming 
> table and an inline VALUES clause (containing ROW-typed data), checkpoint 
> size grows monotonically with each checkpoint. After approximately 16 
> checkpoints, the checkpoint fails due to memory/size constraints, causing the 
> entire job to fail.
> *Steps to Reproduce:*
> Create the input source table and output sink table:
> {code:sql}
> CREATE TABLE input (
>     id    STRING,
>     ingest_time  TIMESTAMP(3),
>     WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '500',
>     'number-of-rows' = '100000'
> );
> CREATE TABLE blackhole_table (
>     id STRING,
>     column1 ROW<id STRING, version STRING, description STRING>,
>     ingest_time TIMESTAMP(3)
> ) WITH (
>     'connector' = 'blackhole'
> );
> {code}
> Execute the job that performs a CROSS JOIN with an inline VALUES clause:
> {code:sql}
> CREATE VIEW inlineData AS
> SELECT
>     cfg
> FROM (VALUES
>     CAST(ROW('id1', '1.0.0', 'description1') AS ROW<id STRING, version 
> STRING, description STRING>),
>     CAST(ROW('id2', '1.1.0', 'description1.1') AS ROW<id STRING, version 
> STRING, description STRING>),
>     CAST(ROW('id3', '2.0.0', 'description2') AS ROW<id STRING, version 
> STRING, description STRING>)
> ) AS t(cfg);
> INSERT INTO blackhole_table
> SELECT
>     input.id,
>     cfg,
>     ingest_time
> FROM input
> CROSS JOIN inlineData;
> {code}
> Configure the job with checkpointing enabled (5-second interval):
>  - Enable checkpointing via configuration
>  - Set checkpoint interval to 5 seconds
>  - Set process memory for both JobManager and TaskManager to 2048m
> Each checkpoint increases in size. After ~16 checkpoints (approximately 80 
> seconds), checkpointing fails and the job terminates.
> *Expected Behavior*
> Checkpoint size should stabilize after initial state accumulation. The inline 
> VALUES clause is static and should not cause checkpoint size to grow with 
> each checkpoint cycle.
> {*}Actual Behavior{*}:
> Checkpoint size grows linearly with each checkpoint until the job fails. This 
> suggests state is being accumulated inappropriately, possibly due to improper 
> state cleanup or incorrect handling of the inline VALUES operator in cross 
> join scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to