[ 
https://issues.apache.org/jira/browse/FLINK-39673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Han Yin updated FLINK-39673:
----------------------------
    Affects Version/s: 1.20.3
                       1.20.2
                       1.20.1
                       1.20.0
                       2.0.0

> SQL jobs may ignore user-configured claim mode with deprecated config key
> -------------------------------------------------------------------------
>
>                 Key: FLINK-39673
>                 URL: https://issues.apache.org/jira/browse/FLINK-39673
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 2.0.0, 1.20.0, 1.20.1, 1.20.2, 1.20.3, 1.20.4
>            Reporter: Han Yin
>            Priority: Major
>
> h1. Background
> Since Flink 1.20, the state recovery configuration keys were renamed:
> ||Old Key (deprecated)||New Key||
> |execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
> The new keys are defined in `StateRecoveryOptions` with 
> `withDeprecatedKeys(...)` pointing to the old keys:
> {code:java}
> // StateRecoveryOptions.java
> public static final ConfigOption<RestoreMode> RESTORE_MODE =
>     key("execution.state-recovery.claim-mode")
>         .enumType(RestoreMode.class)
>         .defaultValue(RestoreMode.DEFAULT)  // DEFAULT = NO_CLAIM
>         .withDeprecatedKeys("execution.savepoint-restore-mode")
>         ...{code}
> Flink's `withDeprecatedKeys` mechanism defines the following resolution rules 
> (see FLIP-406) for details:
>  * If the user configures *only the new key* 
> (`execution.state-recovery.claim-mode`), it takes effect.
>  * If the user configures *only the old (deprecated) key* 
> (`execution.savepoint-restore-mode`), it also takes effect — 
> `withDeprecatedKeys` ensures backward compatibility by falling back to the 
> old key.
>  * If *both* keys are configured, the *new key takes precedence* over the 
> deprecated one.
> h1. Problem and Steps to Reproduce
> Start a Flink standalone cluster.
> Submit a SQL job restoring from a checkpoint. For example:
> {code:java}
> // SQL
> SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
> SET 'execution.state-recovery.claim-mode' = 'CLAIM';
> CREATE TABLE source_table (
>     word STRING
> ) WITH (
>     'connector' = 'datagen',
>     'rows-per-second' = '5',
>     'fields.word.length' = '10'
> );
> CREATE TABLE sink_table (
>     word       STRING,
>     word_count BIGINT
> ) WITH (
>     'connector' = 'print'
> );
> INSERT INTO sink_table
> SELECT word, COUNT(*) AS word_count
> FROM source_table
> GROUP BY word; {code}
>  
> The SQL above sets the claim mode using the {*}new key{*}. In this case, the 
> job restores in *CLAIM* mode as expected.
>  
> Now change the 2nd `SET` statement to use the *deprecated key* instead:
> {code:java}
> SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
> {*}Expected{*}: The job should still restore in *CLAIM* mode — per the 
> `withDeprecatedKeys` contract, the old key should be honored when the new key 
> is not explicitly set.
> {*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which 
> directly conflicts with what the user explicitly specified via the SQL `SET` 
> statement. The practical consequence is that Flink does *not* claim ownership 
> of the restored checkpoint. As the job continues running and produces newer 
> checkpoints that subsume the old ones, the original checkpoint provided above 
> will *never be cleaned up* and will remain orphaned on HDFS indefinitely, 
> leading to unnecessary storage consumption.
> h1. Cause: User-configured value can be overridden by default value
> When a SQL job is submitted through SQL Gateway, the following happens:
> h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with 
> defaults
> `CliFrontendParser.createSavepointRestoreSettings()` is called. When the user 
> does *not* provide  `-claimMode` or `-restoreMode` on the command line, the 
> method falls into the `else` branch and uses the default value:
> {code:java}
> // CliFrontendParser.java
> public static SavepointRestoreSettings 
> createSavepointRestoreSettings(CommandLine commandLine) {
>     if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
>         ...
>         if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
>             restoreMode = ...;
>         } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
>             restoreMode = ...;
>         } else {
>             restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); 
> // NO_CLAIM
>         }
>         return SavepointRestoreSettings.forPath(savepointPath, 
> allowNonRestoredState, restoreMode);
>     }
>     ...
> } {code}
> Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when 
> non-null), but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are 
> *always* set — even when the user never specified them on the command line.
> h2. Step 2 — `toConfiguration()` unconditionally writes default values using 
> the new key
> {code:java}
> // SavepointRestoreSettings.java
> public static void toConfiguration(
>         final SavepointRestoreSettings savepointRestoreSettings,
>         final Configuration configuration) {
>     configuration.set(
>             StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,     // 
> always set
>             savepointRestoreSettings.allowNonRestoredState());
>     configuration.set(
>             StateRecoveryOptions.RESTORE_MODE,                         // 
> always set
>             savepointRestoreSettings.getRestoreMode());
>     final String savepointPath = savepointRestoreSettings.getRestorePath();
>     if (savepointPath != null) {
>         configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
>     }
> } {code}
> h2. Step 3 — User's SQL `SET` configuration is overridden
> When the user configures the claim mode via SQL `SET` statements (or Flink 
> configuration files), the value may be written using either the old or new 
> key:
> {code:java}
> // SQL
> -- Use the new key
> SET 'execution.state-recovery.claim-mode' = 'CLAIM';
> -- or the deprecated key:
> SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
> However, since Step 2 already wrote the default value (`NO_CLAIM`) using the 
> {*}new key{*}, the user's value specified with the old key gets overridden by 
> the default value.
> h2. Root Cause
> `SavepointRestoreSettings.toConfiguration()` does not distinguish between 
> "user explicitly set this value via command line" and "this is a default 
> because the user didn't specify anything." It always writes both 
> `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should 
> remain unset to allow downstream configuration (e.g., SQL `SET`, 
> `flink-conf.yaml`) to take effect.
>  
> h1. Suggested Fix
> Modify `SavepointRestoreSettings.toConfiguration()` to skip writing 
> `RESTORE_MODE` and
> `SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by 
> the user.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to