This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dae2eb5b61f71b9453a73e4f0b3c69fd28f54ebf Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Mon Nov 20 17:05:00 2023 +0800 [FLINK-33581][core] Deprecate getter/setter methods related to state backend in the StreamExecutionEnvironment. --- .../examples/statemachine/StateMachineExample.java | 10 ++++--- .../environment/StreamExecutionEnvironment.java | 33 ++++++++++++++++++++-- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index f905043b2ee..940b836d0bf 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -30,15 +30,14 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; @@ -89,13 +88,16 @@ public class StateMachineExample { final String stateBackend = params.get("backend", "memory"); if ("hashmap".equals(stateBackend)) { final String checkpointDir = params.get("checkpoint-dir"); - env.setStateBackend(new HashMapStateBackend()); + configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap"); configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); } else if ("rocks".equals(stateBackend)) { final String checkpointDir = params.get("checkpoint-dir"); boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false); - env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints)); + configuration.set( + StateBackendOptions.STATE_BACKEND, + "org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory"); + configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incrementalCheckpoints); configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 52b5e30034f..9069b3a0d3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -202,8 +202,15 @@ public class StreamExecutionEnvironment implements AutoCloseable { private boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled = true; - /** The state backend used for storing k/v state and state snapshots. */ - private StateBackend defaultStateBackend; + /** + * The state backend used for storing k/v state and state snapshots. + * + * @deprecated The field is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * field will be removed entirely. + */ + @Deprecated private StateBackend defaultStateBackend; /** Whether to enable ChangelogStateBackend, default value is unset. */ private TernaryBoolean changelogStateBackendEnabled = TernaryBoolean.UNDEFINED; @@ -665,10 +672,24 @@ public class StreamExecutionEnvironment implements AutoCloseable { * org.apache.flink.runtime.state.CheckpointStorage} which configures how and where state * backends persist during a checkpoint. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to switch to using the ConfigOptions + * provided for configuring state backend like the following code snippet: + * <pre>{@code + * Configuration config = new Configuration(); + * config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + * }</pre> + * For more details on using ConfigOption for state backend configuration, please refer to + * the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/state_backends">state-backends</a> * @return This StreamExecutionEnvironment itself, to allow chaining of function calls. * @see #getStateBackend() * @see CheckpointConfig#setCheckpointStorage( org.apache.flink.runtime.state.CheckpointStorage) */ + @Deprecated @PublicEvolving public StreamExecutionEnvironment setStateBackend(StateBackend backend) { this.defaultStateBackend = Preconditions.checkNotNull(backend); @@ -678,8 +699,16 @@ public class StreamExecutionEnvironment implements AutoCloseable { /** * Gets the state backend that defines how to store and checkpoint state. * + * @deprecated The method is marked as deprecated because starting from Flink 1.19, the usage of + * all complex Java objects related to configuration, including their getter and setter + * methods, should be replaced by ConfigOption. In a future major version of Flink, this + * method will be removed entirely. It is recommended to find which state backend is used by + * state backend ConfigOption. For more details on using ConfigOption for state backend + * configuration, please refer to the Flink documentation: <a + * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/state_backends">state-backends</a> * @see #setStateBackend(StateBackend) */ + @Deprecated @PublicEvolving public StateBackend getStateBackend() { return defaultStateBackend;