Han Yin created FLINK-39673:
-------------------------------

             Summary: SQL jobs may ignore user-configured claim mode with 
deprecated config key
                 Key: FLINK-39673
                 URL: https://issues.apache.org/jira/browse/FLINK-39673
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.20.4
            Reporter: Han Yin


h1. Backgroud

Since Flink 1.20, the state recovery configuration keys were renamed:

 
||Old Key (deprecated)||New Key||
|execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
The new keys are defined in `StateRecoveryOptions` with 
`withDeprecatedKeys(...)` pointing to the old keys:
{code:java}
// StateRecoveryOptions.java
public static final ConfigOption<RestoreMode> RESTORE_MODE =
    key("execution.state-recovery.claim-mode")
        .enumType(RestoreMode.class)
        .defaultValue(RestoreMode.DEFAULT)  // DEFAULT = NO_CLAIM
        .withDeprecatedKeys("execution.savepoint-restore-mode")
        ...{code}
Flink's `withDeprecatedKeys` mechanism defines the following resolution rules 
(see [FLIP-406|https://issues.apache.org/jira/browse/FLINK-34255]) for details:
 * If the user configures *only the new key* 
(`execution.state-recovery.claim-mode`), it takes effect.
 * If the user configures *only the old (deprecated) key* 
(`execution.savepoint-restore-mode`), it also takes effect — 
`withDeprecatedKeys` ensures backward compatibility by falling back to the old 
key.
 *  If *both* keys are configured, the *new key takes precedence* over the 
deprecated one.
h1. Problem and Steps to Reproduce

Start a Flink standalone cluster.

Submit a SQL job restoring from a checkpoint. For example:

 
{code:java}
// SQL

SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
SET 'execution.state-recovery.claim-mode' = 'CLAIM';

CREATE TABLE source_table (
    word STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '10'
);

CREATE TABLE sink_table (
    word       STRING,
    word_count BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO sink_table
SELECT word, COUNT(*) AS word_count
FROM source_table
GROUP BY word; {code}
 
The SQL above sets the claim mode using the {*}new key{*}. In this case, the 
job restores in *CLAIM* mode as expected.
 

Now change the 2nd `SET` statement to use the *deprecated key* instead:

 
{code:java}
SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
 
{*}Expected{*}: The job should still restore in *CLAIM* mode — per the 
`withDeprecatedKeys` contract, the old key should be honored when the new key 
is not explicitly set.

{*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which 
directly conflicts with what the user explicitly specified via the SQL `SET` 
statement. The practical consequence is that Flink does *not* claim ownership 
of the restored checkpoint. As the job continues running and produces newer 
checkpoints that subsume the old ones, the original checkpoint provided above 
will *never be cleaned up* and will remain orphaned on HDFS indefinitely, 
leading to unnecessary storage consumption.
h1. Cause: User-configured value can be overridden by default value
When a SQL job is submitted through SQL Gateway, the following happens:
h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with 
defaults

`CliFrontendParser.createSavepointRestoreSettings()` is called. When the user 
does *not* provide `--claimMode` / `--restoreMode` on the command line, the 
method falls into the `else` branch and uses the default value:
{code:java}
// CliFrontendParser.java
public static SavepointRestoreSettings 
createSavepointRestoreSettings(CommandLine commandLine) {
    if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
        ...
        if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
            restoreMode = ...;
        } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
            restoreMode = ...;
        } else {
            restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); // 
NO_CLAIM
        }
        return SavepointRestoreSettings.forPath(savepointPath, 
allowNonRestoredState, restoreMode);
    }
    ...
} {code}
Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when non-null), 
but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are *always* set — 
even when the user never specified them on the command line.
h2. Step 2 — `toConfiguration()` unconditionally writes default values using 
the new key
{code:java}
// SavepointRestoreSettings.java
public static void toConfiguration(
        final SavepointRestoreSettings savepointRestoreSettings,
        final Configuration configuration) {
    configuration.set(
            StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,     // 
always set
            savepointRestoreSettings.allowNonRestoredState());
    configuration.set(
            StateRecoveryOptions.RESTORE_MODE,                         // 
always set
            savepointRestoreSettings.getRestoreMode());
    final String savepointPath = savepointRestoreSettings.getRestorePath();
    if (savepointPath != null) {
        configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
    }
} {code}
h2. Step 3 — User's SQL `SET` configuration is overridden

When the user configures the claim mode via SQL `SET` statements (or Flink 
configuration files), the value may be written using either the old or new key:
{code:java}
// SQL
-- Use the new key
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
-- or the deprecated key:
SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
However, since Step 2 already wrote the default value (`NO_CLAIM`) using the 
{*}new key{*}, the user's value specified with the old key gets overridden by 
the default value.
h2. Root Cause
`SavepointRestoreSettings.toConfiguration()` does not distinguish between "user 
explicitly set this value via command line" and "this is a default because the 
user didn't specify anything." It always writes both `RESTORE_MODE` and 
`SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should remain unset to allow 
downstream configuration (e.g., SQL `SET`, `flink-conf.yaml`) to take effect.
 
h1. Suggested Fix
Modify `SavepointRestoreSettings.toConfiguration()` to skip writing 
`RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by 
the user.
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to