[
https://issues.apache.org/jira/browse/FLINK-39673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Han Yin updated FLINK-39673:
----------------------------
Description:
h1. Background
Since Flink 1.20, the state recovery configuration keys were renamed:
||Old Key (deprecated)||New Key||
|execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
The new keys are defined in `StateRecoveryOptions` with
`withDeprecatedKeys(...)` pointing to the old keys:
{code:java}
// StateRecoveryOptions.java
public static final ConfigOption<RestoreMode> RESTORE_MODE =
key("execution.state-recovery.claim-mode")
.enumType(RestoreMode.class)
.defaultValue(RestoreMode.DEFAULT) // DEFAULT = NO_CLAIM
.withDeprecatedKeys("execution.savepoint-restore-mode")
...{code}
Flink's `withDeprecatedKeys` mechanism defines the following resolution rules
(see FLIP-406) for details:
* If the user configures *only the new key*
(`execution.state-recovery.claim-mode`), it takes effect.
* If the user configures *only the old (deprecated) key*
(`execution.savepoint-restore-mode`), it also takes effect —
`withDeprecatedKeys` ensures backward compatibility by falling back to the old
key.
* If *both* keys are configured, the *new key takes precedence* over the
deprecated one.
h1. Problem and Steps to Reproduce
Start a Flink standalone cluster.
Submit a SQL job restoring from a checkpoint. For example:
{code:java}
// SQL
SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
CREATE TABLE source_table (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.word.length' = '10'
);
CREATE TABLE sink_table (
word STRING,
word_count BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT word, COUNT(*) AS word_count
FROM source_table
GROUP BY word; {code}
The SQL above sets the claim mode using the {*}new key{*}. In this case, the
job restores in *CLAIM* mode as expected.
Now change the 2nd `SET` statement to use the *deprecated key* instead:
{code:java}
SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
{*}Expected{*}: The job should still restore in *CLAIM* mode — per the
`withDeprecatedKeys` contract, the old key should be honored when the new key
is not explicitly set.
{*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which
directly conflicts with what the user explicitly specified via the SQL `SET`
statement. The practical consequence is that Flink does *not* claim ownership
of the restored checkpoint. As the job continues running and produces newer
checkpoints that subsume the old ones, the original checkpoint provided above
will *never be cleaned up* and will remain orphaned on HDFS indefinitely,
leading to unnecessary storage consumption.
h1. Cause: User-configured value can be overridden by default value
When a SQL job is submitted through SQL Gateway, the following happens:
h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with
defaults
`CliFrontendParser.createSavepointRestoreSettings()` is called. When the user
does *not* provide `-claimMode` or `-restoreMode` on the command line, the
method falls into the `else` branch and uses the default value:
{code:java}
// CliFrontendParser.java
public static SavepointRestoreSettings
createSavepointRestoreSettings(CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
...
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
restoreMode = ...;
} else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
restoreMode = ...;
} else {
restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); //
NO_CLAIM
}
return SavepointRestoreSettings.forPath(savepointPath,
allowNonRestoredState, restoreMode);
}
...
} {code}
Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when non-null),
but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are *always* set —
even when the user never specified them on the command line.
h2. Step 2 — `toConfiguration()` unconditionally writes default values using
the new key
{code:java}
// SavepointRestoreSettings.java
public static void toConfiguration(
final SavepointRestoreSettings savepointRestoreSettings,
final Configuration configuration) {
configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, //
always set
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
StateRecoveryOptions.RESTORE_MODE, //
always set
savepointRestoreSettings.getRestoreMode());
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
}
} {code}
h2. Step 3 — User's SQL `SET` configuration is overridden
When the user configures the claim mode via SQL `SET` statements (or Flink
configuration files), the value may be written using either the old or new key:
{code:java}
// SQL
-- Use the new key
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
-- or the deprecated key:
SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
However, since Step 2 already wrote the default value (`NO_CLAIM`) using the
{*}new key{*}, the user's value specified with the old key gets overridden by
the default value.
h2. Root Cause
`SavepointRestoreSettings.toConfiguration()` does not distinguish between "user
explicitly set this value via command line" and "this is a default because the
user didn't specify anything." It always writes both `RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should remain unset to allow
downstream configuration (e.g., SQL `SET`, `flink-conf.yaml`) to take effect.
h1. Suggested Fix
Modify `SavepointRestoreSettings.toConfiguration()` to skip writing
`RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by
the user.
was:
h1. Backgroud
Since Flink 1.20, the state recovery configuration keys were renamed:
||Old Key (deprecated)||New Key||
|execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
The new keys are defined in `StateRecoveryOptions` with
`withDeprecatedKeys(...)` pointing to the old keys:
{code:java}
// StateRecoveryOptions.java
public static final ConfigOption<RestoreMode> RESTORE_MODE =
key("execution.state-recovery.claim-mode")
.enumType(RestoreMode.class)
.defaultValue(RestoreMode.DEFAULT) // DEFAULT = NO_CLAIM
.withDeprecatedKeys("execution.savepoint-restore-mode")
...{code}
Flink's `withDeprecatedKeys` mechanism defines the following resolution rules
(see [FLIP-406|https://issues.apache.org/jira/browse/FLINK-34255]) for details:
* If the user configures *only the new key*
(`execution.state-recovery.claim-mode`), it takes effect.
* If the user configures *only the old (deprecated) key*
(`execution.savepoint-restore-mode`), it also takes effect —
`withDeprecatedKeys` ensures backward compatibility by falling back to the old
key.
* If *both* keys are configured, the *new key takes precedence* over the
deprecated one.
h1. Problem and Steps to Reproduce
Start a Flink standalone cluster.
Submit a SQL job restoring from a checkpoint. For example:
{code:java}
// SQL
SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
CREATE TABLE source_table (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.word.length' = '10'
);
CREATE TABLE sink_table (
word STRING,
word_count BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT word, COUNT(*) AS word_count
FROM source_table
GROUP BY word; {code}
The SQL above sets the claim mode using the {*}new key{*}. In this case, the
job restores in *CLAIM* mode as expected.
Now change the 2nd `SET` statement to use the *deprecated key* instead:
{code:java}
SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
{*}Expected{*}: The job should still restore in *CLAIM* mode — per the
`withDeprecatedKeys` contract, the old key should be honored when the new key
is not explicitly set.
{*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which
directly conflicts with what the user explicitly specified via the SQL `SET`
statement. The practical consequence is that Flink does *not* claim ownership
of the restored checkpoint. As the job continues running and produces newer
checkpoints that subsume the old ones, the original checkpoint provided above
will *never be cleaned up* and will remain orphaned on HDFS indefinitely,
leading to unnecessary storage consumption.
h1. Cause: User-configured value can be overridden by default value
When a SQL job is submitted through SQL Gateway, the following happens:
h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with
defaults
`CliFrontendParser.createSavepointRestoreSettings()` is called. When the user
does *not* provide `--claimMode` / `--restoreMode` on the command line, the
method falls into the `else` branch and uses the default value:
{code:java}
// CliFrontendParser.java
public static SavepointRestoreSettings
createSavepointRestoreSettings(CommandLine commandLine) {
if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
...
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
restoreMode = ...;
} else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
restoreMode = ...;
} else {
restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); //
NO_CLAIM
}
return SavepointRestoreSettings.forPath(savepointPath,
allowNonRestoredState, restoreMode);
}
...
} {code}
Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when non-null),
but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are *always* set —
even when the user never specified them on the command line.
h2. Step 2 — `toConfiguration()` unconditionally writes default values using
the new key
{code:java}
// SavepointRestoreSettings.java
public static void toConfiguration(
final SavepointRestoreSettings savepointRestoreSettings,
final Configuration configuration) {
configuration.set(
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, //
always set
savepointRestoreSettings.allowNonRestoredState());
configuration.set(
StateRecoveryOptions.RESTORE_MODE, //
always set
savepointRestoreSettings.getRestoreMode());
final String savepointPath = savepointRestoreSettings.getRestorePath();
if (savepointPath != null) {
configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
}
} {code}
h2. Step 3 — User's SQL `SET` configuration is overridden
When the user configures the claim mode via SQL `SET` statements (or Flink
configuration files), the value may be written using either the old or new key:
{code:java}
// SQL
-- Use the new key
SET 'execution.state-recovery.claim-mode' = 'CLAIM';
-- or the deprecated key:
SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
However, since Step 2 already wrote the default value (`NO_CLAIM`) using the
{*}new key{*}, the user's value specified with the old key gets overridden by
the default value.
h2. Root Cause
`SavepointRestoreSettings.toConfiguration()` does not distinguish between "user
explicitly set this value via command line" and "this is a default because the
user didn't specify anything." It always writes both `RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should remain unset to allow
downstream configuration (e.g., SQL `SET`, `flink-conf.yaml`) to take effect.
h1. Suggested Fix
Modify `SavepointRestoreSettings.toConfiguration()` to skip writing
`RESTORE_MODE` and
`SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by
the user.
> SQL jobs may ignore user-configured claim mode with deprecated config key
> -------------------------------------------------------------------------
>
> Key: FLINK-39673
> URL: https://issues.apache.org/jira/browse/FLINK-39673
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.20.4
> Reporter: Han Yin
> Priority: Major
>
> h1. Background
> Since Flink 1.20, the state recovery configuration keys were renamed:
> ||Old Key (deprecated)||New Key||
> |execution.savepoint-restore-mode|execution.state-recovery.claim-mode|
> The new keys are defined in `StateRecoveryOptions` with
> `withDeprecatedKeys(...)` pointing to the old keys:
> {code:java}
> // StateRecoveryOptions.java
> public static final ConfigOption<RestoreMode> RESTORE_MODE =
> key("execution.state-recovery.claim-mode")
> .enumType(RestoreMode.class)
> .defaultValue(RestoreMode.DEFAULT) // DEFAULT = NO_CLAIM
> .withDeprecatedKeys("execution.savepoint-restore-mode")
> ...{code}
> Flink's `withDeprecatedKeys` mechanism defines the following resolution rules
> (see FLIP-406) for details:
> * If the user configures *only the new key*
> (`execution.state-recovery.claim-mode`), it takes effect.
> * If the user configures *only the old (deprecated) key*
> (`execution.savepoint-restore-mode`), it also takes effect —
> `withDeprecatedKeys` ensures backward compatibility by falling back to the
> old key.
> * If *both* keys are configured, the *new key takes precedence* over the
> deprecated one.
> h1. Problem and Steps to Reproduce
> Start a Flink standalone cluster.
> Submit a SQL job restoring from a checkpoint. For example:
> {code:java}
> // SQL
> SET 'execution.savepoint.path' = 'hdfs:///checkpoints/xxx/chk-xx';
> SET 'execution.state-recovery.claim-mode' = 'CLAIM';
> CREATE TABLE source_table (
> word STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '5',
> 'fields.word.length' = '10'
> );
> CREATE TABLE sink_table (
> word STRING,
> word_count BIGINT
> ) WITH (
> 'connector' = 'print'
> );
> INSERT INTO sink_table
> SELECT word, COUNT(*) AS word_count
> FROM source_table
> GROUP BY word; {code}
>
> The SQL above sets the claim mode using the {*}new key{*}. In this case, the
> job restores in *CLAIM* mode as expected.
>
> Now change the 2nd `SET` statement to use the *deprecated key* instead:
> {code:java}
> SET 'execution.savepoint-restore-mode' = 'CLAIM';{code}
> {*}Expected{*}: The job should still restore in *CLAIM* mode — per the
> `withDeprecatedKeys` contract, the old key should be honored when the new key
> is not explicitly set.
> {*}Actual{*}: The job restores in *NO_CLAIM* mode — the default value — which
> directly conflicts with what the user explicitly specified via the SQL `SET`
> statement. The practical consequence is that Flink does *not* claim ownership
> of the restored checkpoint. As the job continues running and produces newer
> checkpoints that subsume the old ones, the original checkpoint provided above
> will *never be cleaned up* and will remain orphaned on HDFS indefinitely,
> leading to unnecessary storage consumption.
> h1. Cause: User-configured value can be overridden by default value
> When a SQL job is submitted through SQL Gateway, the following happens:
> h2. Step 1 — CommandLine parsing creates `SavepointRestoreSettings` with
> defaults
> `CliFrontendParser.createSavepointRestoreSettings()` is called. When the user
> does *not* provide `-claimMode` or `-restoreMode` on the command line, the
> method falls into the `else` branch and uses the default value:
> {code:java}
> // CliFrontendParser.java
> public static SavepointRestoreSettings
> createSavepointRestoreSettings(CommandLine commandLine) {
> if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
> ...
> if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
> restoreMode = ...;
> } else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
> restoreMode = ...;
> } else {
> restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue();
> // NO_CLAIM
> }
> return SavepointRestoreSettings.forPath(savepointPath,
> allowNonRestoredState, restoreMode);
> }
> ...
> } {code}
> Note the asymmetry: `SAVEPOINT_PATH` is conditionally set (only when
> non-null), but `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE` are
> *always* set — even when the user never specified them on the command line.
> h2. Step 2 — `toConfiguration()` unconditionally writes default values using
> the new key
> {code:java}
> // SavepointRestoreSettings.java
> public static void toConfiguration(
> final SavepointRestoreSettings savepointRestoreSettings,
> final Configuration configuration) {
> configuration.set(
> StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, //
> always set
> savepointRestoreSettings.allowNonRestoredState());
> configuration.set(
> StateRecoveryOptions.RESTORE_MODE, //
> always set
> savepointRestoreSettings.getRestoreMode());
> final String savepointPath = savepointRestoreSettings.getRestorePath();
> if (savepointPath != null) {
> configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath);
> }
> } {code}
> h2. Step 3 — User's SQL `SET` configuration is overridden
> When the user configures the claim mode via SQL `SET` statements (or Flink
> configuration files), the value may be written using either the old or new
> key:
> {code:java}
> // SQL
> -- Use the new key
> SET 'execution.state-recovery.claim-mode' = 'CLAIM';
> -- or the deprecated key:
> SET 'execution.savepoint-restore-mode' = 'CLAIM'; {code}
> However, since Step 2 already wrote the default value (`NO_CLAIM`) using the
> {*}new key{*}, the user's value specified with the old key gets overridden by
> the default value.
> h2. Root Cause
> `SavepointRestoreSettings.toConfiguration()` does not distinguish between
> "user explicitly set this value via command line" and "this is a default
> because the user didn't specify anything." It always writes both
> `RESTORE_MODE` and `SAVEPOINT_IGNORE_UNCLAIMED_STATE`, even when they should
> remain unset to allow downstream configuration (e.g., SQL `SET`,
> `flink-conf.yaml`) to take effect.
>
> h1. Suggested Fix
> Modify `SavepointRestoreSettings.toConfiguration()` to skip writing
> `RESTORE_MODE` and
> `SAVEPOINT_IGNORE_UNCLAIMED_STATE` when they were not explicitly provided by
> the user.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)