[ https://issues.apache.org/jira/browse/FLINK-35336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-35336: ----------------------------------- Issue Type: New Feature (was: Bug) > SQL failed to restore from savepoint after change in default-parallelism > ------------------------------------------------------------------------ > > Key: FLINK-35336 > URL: https://issues.apache.org/jira/browse/FLINK-35336 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner > Affects Versions: 1.18.1 > Environment: Flink SQL Client, Flink 1.18.1 on MacOS > Reporter: Keith Lee > Priority: Major > > After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am > observing the following exception on restoring job from savepoint with an > unmodified statement set. > > {quote}[ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint > [file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff|file:///tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff]. > Cannot map checkpoint/savepoint state for operator > 46ba9b22862c3bbe9373c6abee964b2a to the new program, because the operator is > not available in the new program. If you want to allow to skip this, you can > set the --allowNonRestoredState option on the CLI. > {quote} > When started without savepoints, the jobgraph differs for the jobs despite > identical statements being ran. > There are 2 operators when default parallelism is 1. > {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> > StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, > Calc[71] -> LocalWindowAggregate[72]) > B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: CampaignAggregationsJDBC[76] > {quote} > Three operators when default parallelism is 4. > {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> > StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> > LocalWindowAggregate[90]) > B: Sink: end > C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: CampaignAggregationsJDBC[94] > {quote} > > Notice that the operator 'Sink: end' is separated out when parallelism is set > to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not show any > difference between syntax tree, physical plan or execution plan. > I have attempted various configurations in `table.optimizer.*`. > Steps to reproduce > {quote}SET 'table.exec.resource.default-parallelism' = '1'; > EXECUTE STATEMENT SET BEGIN > INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, > interaction_type, interaction_target, interaction_tags, event_date, > event_hour, event_time) > SELECT > user_id, > user_session, > interaction_type, > interaction_target, > interaction_tags, > DATE_FORMAT(event_time , 'yyyy-MM-dd'), > DATE_FORMAT(event_time , 'HH'), > event_time > FROM UserBehaviourKafkaSource > WHERE > interaction_result Like '%ERROR%'; > INSERT INTO CampaignAggregationsJDBC > SELECT > CONCAT_WS('/', interaction_tags, interaction_result, > DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, > 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, > interaction_tags as campaign, > interaction_result, > COUNT(*) AS interaction_count, > window_start, > window_end > FROM > TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), > INTERVAL '10' SECONDS)) > GROUP BY window_start, window_end, interaction_tags, interaction_result; > END; > STOP JOB '<JOB_ID>' WITH SAVEPOINT; > SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/'; > SET 'table.exec.resource.default-parallelism' = '4'; > <Re-run DML at line 2> > {quote} > DDLs > {quote}– S3 Sink > CREATE TABLE UserErrorExperienceS3Sink ( > user_id BIGINT, > user_session STRING, > interaction_type STRING, > interaction_target STRING, > interaction_tags STRING, > event_date STRING, > event_hour STRING, > event_time TIMESTAMP(3) WITHOUT TIME ZONE) > PARTITIONED BY (event_date, event_hour) > WITH ( > 'connector' = 'filesystem', > 'path' = 's3://<S3BUCKET>/userErrorExperience/', > 'format' = 'json'); > – Kafka Source > ADD JAR > 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar'; > CREATE TABLE UserBehaviourKafkaSource ( > user_id BIGINT, > user_session STRING, > interaction_type STRING, > interaction_target STRING, > interaction_tags STRING, > interaction_result STRING, > event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp', > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) > WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behaviour', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'demoGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'csv'); > – PostgreSQL Source/Sink > ADD JAR 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar'; > ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar'; > CREATE TABLE CampaignAggregationsJDBC ( > id STRING, > campaign STRING, > interaction_result STRING, > interaction_count BIGINT, > window_start TIMESTAMP(3) WITHOUT TIME ZONE, > window_end TIMESTAMP(3) WITHOUT TIME ZONE) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:postgresql://localhost:5432/postgres', > 'table-name' = 'campaign_aggregations'); > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)