[ 
https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Keith Lee updated FLINK-35336:
------------------------------
    Description: 
After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am 
observing the following exception on restoring job from savepoint with an 
unmodified statement set. 
 
{quote}[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
[file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff].
 Cannot map checkpoint/savepoint state for operator 
46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI.
{quote}
When started without savepoints, the jobgraph differs for the jobs despite 
identical statements being ran.

There are 2 operators when default parallelism is 1.
{quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> 
StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, Calc[71] 
-> LocalWindowAggregate[72])
B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: CampaignAggregationsJDBC[76]
{quote}
Three operators when default parallelism is 4.
{quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> 
StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> 
LocalWindowAggregate[90]) 
B: Sink: end 
C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: CampaignAggregationsJDBC[94]
{quote}
 
Notice that the operator 'Sink: end' is separated out when parallelism is set 
to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any 
difference between syntax tree, physical plan or execution plan.

I have attempted various configurations in `table.optimizer.*`.

Steps to reproduce
{quote}SET 'table.exec.resource.default-parallelism' = '1';
EXECUTE STATEMENT SET BEGIN 
    INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, 
interaction_type, interaction_target, interaction_tags, event_date, event_hour, 
event_time)
    SELECT
        user_id, 
        user_session,
        interaction_type,
        interaction_target,
        interaction_tags, 
        DATE_FORMAT(event_time , 'yyyy-MM-dd'),
        DATE_FORMAT(event_time , 'HH'),
        event_time 
    FROM UserBehaviourKafkaSource 
    WHERE 
        interaction_result Like '%ERROR%'; 

    INSERT INTO CampaignAggregationsJDBC 
    SELECT 
        CONCAT_WS('/', interaction_tags, interaction_result, 
DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
'YYYY-MM-DD HH:mm:ss.SSS')) AS id, 
        interaction_tags as campaign, 
        interaction_result, 
        COUNT(*) AS interaction_count, 
        window_start, 
        window_end 
    FROM 
        TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), 
INTERVAL '10' SECONDS)) 
    GROUP BY window_start, window_end, interaction_tags, interaction_result; 
END;

STOP JOB '<JOB_ID>' WITH SAVEPOINT;
SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/';
SET 'table.exec.resource.default-parallelism' = '4';

<Re-run DML at line 2>
{quote}
DDLs
{quote}– S3 Sink
CREATE TABLE UserErrorExperienceS3Sink (
  user_id BIGINT,
  user_session STRING,
  interaction_type STRING,
  interaction_target STRING,
  interaction_tags STRING,
  event_date STRING,
  event_hour STRING,
  event_time TIMESTAMP(3) WITHOUT TIME ZONE)
PARTITIONED BY (event_date, event_hour)
WITH (
  'connector' = 'filesystem',
  'path' = 's3://<S3BUCKET>/userErrorExperience/',
  'format' = 'json');

– Kafka Source
ADD JAR 
'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
CREATE TABLE UserBehaviourKafkaSource (
  user_id BIGINT,
  user_session STRING,
  interaction_type STRING,
  interaction_target STRING,
  interaction_tags STRING,
  interaction_result STRING,
  event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
WITH (
  'connector' = 'kafka',
  'topic' = 'user_behaviour',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'demoGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv');

– PostgreSQL Source/Sink 
ADD JAR 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar';
ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar';
CREATE TABLE CampaignAggregationsJDBC (
  id STRING,
  campaign STRING,
  interaction_result STRING,
  interaction_count BIGINT,
  window_start TIMESTAMP(3) WITHOUT TIME ZONE,
  window_end TIMESTAMP(3) WITHOUT TIME ZONE)
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'campaign_aggregations');
{quote}

  was:
After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am 
observing the following exception on restoring job from savepoint with an 
unmodified statement set. 
 
{quote}[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff. Cannot map 
checkpoint/savepoint state for operator 46ba9b22862c3bbe9373c6abee964b2a to the 
new program, because the operator is not available in the new program. If you 
want to allow to skip this, you can set the --allowNonRestoredState option on 
the CLI.{quote}

When started without savepoints, the jobgraph differs for the jobs despite 
identical statements being ran.

There are 2 operators when default parallelism is 1.
{quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> 
StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, Calc[71] 
-> LocalWindowAggregate[72])
B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: 
CampaignAggregationsJDBC[76]{quote}

Three operators when default parallelism is 4.

{quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> 
StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> 
LocalWindowAggregate[90]) 
B: Sink: end 
C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: 
CampaignAggregationsJDBC[94]{quote}
 
Notice that the operator 'Sink: end' is separated out when parallelism is set 
to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any 
difference between syntax tree, physical plan or execution plan.


I have attempted various configurations in `table.optimizer.*`. 

Steps to reproduce

{quote}SET 'table.exec.resource.default-parallelism' = '1';
EXECUTE STATEMENT SET BEGIN 
    INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, 
interaction_type, interaction_target, interaction_tags, event_date, event_hour, 
event_time)
    SELECT
        user_id, 
        user_session,
        interaction_type,
        interaction_target,
        interaction_tags, 
        DATE_FORMAT(event_time , 'yyyy-MM-dd'),
        DATE_FORMAT(event_time , 'HH'),
        event_time 
    FROM UserBehaviourKafkaSource 
    WHERE 
        interaction_result Like '%ERROR%'; 

    INSERT INTO CampaignAggregationsJDBC 
    SELECT 
        CONCAT_WS('/', interaction_tags, interaction_result, 
DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
'YYYY-MM-DD HH:mm:ss.SSS')) AS id, 
        interaction_tags as campaign, 
        interaction_result, 
        COUNT(*) AS interaction_count, 
        window_start, 
        window_end 
    FROM 
        TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), 
INTERVAL '10' SECONDS)) 
    GROUP BY window_start, window_end, interaction_tags, interaction_result; 
END;

STOP JOB '<JOB_ID>' WITH SAVEPOINT;
SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/';
SET 'table.exec.resource.default-parallelism' = '4';

<Re-run DML at line 2>
{quote}

DQLs


{quote}-- S3 Sink
CREATE TABLE UserErrorExperienceS3Sink (
  user_id BIGINT,
  user_session STRING,
  interaction_type STRING,
  interaction_target STRING,
  interaction_tags STRING,
  event_date STRING,
  event_hour STRING,
  event_time TIMESTAMP(3) WITHOUT TIME ZONE)
PARTITIONED BY (event_date, event_hour)
WITH (
  'connector' = 'filesystem',
  'path' = 's3://<S3BUCKET>/userErrorExperience/',
  'format' = 'json');

-- Kafka Source
ADD JAR 
'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
CREATE TABLE UserBehaviourKafkaSource (
  user_id BIGINT,
  user_session STRING,
  interaction_type STRING,
  interaction_target STRING,
  interaction_tags STRING,
  interaction_result STRING,
  event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
WITH (
  'connector' = 'kafka',
  'topic' = 'user_behaviour',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'demoGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv');

-- PostgreSQL Source/Sink 
ADD JAR 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar';
ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar';
CREATE TABLE CampaignAggregationsJDBC (
  id STRING,
  campaign STRING,
  interaction_result STRING,
  interaction_count BIGINT,
  window_start TIMESTAMP(3) WITHOUT TIME ZONE,
  window_end TIMESTAMP(3) WITHOUT TIME ZONE)
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/postgres',
  'table-name' = 'campaign_aggregations');
{quote}


> SQL failed to restore from savepoint after change in 
> default-parallelismFlink/Flink:User
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-35336
>                 URL: https://issues.apache.org/jira/browse/FLINK-35336
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.1
>         Environment: Flink SQL Client, Flink 1.18.1 on MacOS
>            Reporter: Keith Lee
>            Priority: Major
>
> After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am 
> observing the following exception on restoring job from savepoint with an 
> unmodified statement set. 
>  
> {quote}[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
> [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff].
>  Cannot map checkpoint/savepoint state for operator 
> 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
> {quote}
> When started without savepoints, the jobgraph differs for the jobs despite 
> identical statements being ran.
> There are 2 operators when default parallelism is 1.
> {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> 
> StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, 
> Calc[71] -> LocalWindowAggregate[72])
> B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: CampaignAggregationsJDBC[76]
> {quote}
> Three operators when default parallelism is 4.
> {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> 
> StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> 
> LocalWindowAggregate[90]) 
> B: Sink: end 
> C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: CampaignAggregationsJDBC[94]
> {quote}
>  
> Notice that the operator 'Sink: end' is separated out when parallelism is set 
> to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any 
> difference between syntax tree, physical plan or execution plan.
> I have attempted various configurations in `table.optimizer.*`.
> Steps to reproduce
> {quote}SET 'table.exec.resource.default-parallelism' = '1';
> EXECUTE STATEMENT SET BEGIN 
>     INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, 
> interaction_type, interaction_target, interaction_tags, event_date, 
> event_hour, event_time)
>     SELECT
>         user_id, 
>         user_session,
>         interaction_type,
>         interaction_target,
>         interaction_tags, 
>         DATE_FORMAT(event_time , 'yyyy-MM-dd'),
>         DATE_FORMAT(event_time , 'HH'),
>         event_time 
>     FROM UserBehaviourKafkaSource 
>     WHERE 
>         interaction_result Like '%ERROR%'; 
>     INSERT INTO CampaignAggregationsJDBC 
>     SELECT 
>         CONCAT_WS('/', interaction_tags, interaction_result, 
> DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
> 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, 
>         interaction_tags as campaign, 
>         interaction_result, 
>         COUNT(*) AS interaction_count, 
>         window_start, 
>         window_end 
>     FROM 
>         TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), 
> INTERVAL '10' SECONDS)) 
>     GROUP BY window_start, window_end, interaction_tags, interaction_result; 
> END;
> STOP JOB '<JOB_ID>' WITH SAVEPOINT;
> SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/';
> SET 'table.exec.resource.default-parallelism' = '4';
> <Re-run DML at line 2>
> {quote}
> DDLs
> {quote}– S3 Sink
> CREATE TABLE UserErrorExperienceS3Sink (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   event_date STRING,
>   event_hour STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE)
> PARTITIONED BY (event_date, event_hour)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://<S3BUCKET>/userErrorExperience/',
>   'format' = 'json');
> – Kafka Source
> ADD JAR 
> 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
> CREATE TABLE UserBehaviourKafkaSource (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   interaction_result STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
>   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
> WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behaviour',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'demoGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv');
> – PostgreSQL Source/Sink 
> ADD JAR 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar';
> ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar';
> CREATE TABLE CampaignAggregationsJDBC (
>   id STRING,
>   campaign STRING,
>   interaction_result STRING,
>   interaction_count BIGINT,
>   window_start TIMESTAMP(3) WITHOUT TIME ZONE,
>   window_end TIMESTAMP(3) WITHOUT TIME ZONE)
> WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'campaign_aggregations');
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to