This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dae2eb5b61f71b9453a73e4f0b3c69fd28f54ebf
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Mon Nov 20 17:05:00 2023 +0800

    [FLINK-33581][core] Deprecate getter/setter methods related to state 
backend in the StreamExecutionEnvironment.
---
 .../examples/statemachine/StateMachineExample.java | 10 ++++---
 .../environment/StreamExecutionEnvironment.java    | 33 ++++++++++++++++++++--
 2 files changed, 37 insertions(+), 6 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index f905043b2ee..940b836d0bf 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -30,15 +30,14 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import org.apache.flink.connector.datagen.source.GeneratorFunction;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
-import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
@@ -89,13 +88,16 @@ public class StateMachineExample {
         final String stateBackend = params.get("backend", "memory");
         if ("hashmap".equals(stateBackend)) {
             final String checkpointDir = params.get("checkpoint-dir");
-            env.setStateBackend(new HashMapStateBackend());
+            configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
             configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
"filesystem");
             configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
         } else if ("rocks".equals(stateBackend)) {
             final String checkpointDir = params.get("checkpoint-dir");
             boolean incrementalCheckpoints = 
params.getBoolean("incremental-checkpoints", false);
-            env.setStateBackend(new 
EmbeddedRocksDBStateBackend(incrementalCheckpoints));
+            configuration.set(
+                    StateBackendOptions.STATE_BACKEND,
+                    
"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory");
+            configuration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
incrementalCheckpoints);
             configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, 
"filesystem");
             configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 52b5e30034f..9069b3a0d3c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -202,8 +202,15 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
 
     private boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled = 
true;
 
-    /** The state backend used for storing k/v state and state snapshots. */
-    private StateBackend defaultStateBackend;
+    /**
+     * The state backend used for storing k/v state and state snapshots.
+     *
+     * @deprecated The field is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     field will be removed entirely.
+     */
+    @Deprecated private StateBackend defaultStateBackend;
 
     /** Whether to enable ChangelogStateBackend, default value is unset. */
     private TernaryBoolean changelogStateBackendEnabled = 
TernaryBoolean.UNDEFINED;
@@ -665,10 +672,24 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * org.apache.flink.runtime.state.CheckpointStorage} which configures how 
and where state
      * backends persist during a checkpoint.
      *
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to switch to 
using the ConfigOptions
+     *     provided for configuring state backend like the following code 
snippet:
+     *     <pre>{@code
+     * Configuration config = new Configuration();
+     * config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+     * StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+     * }</pre>
+     *     For more details on using ConfigOption for state backend 
configuration, please refer to
+     *     the Flink documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/state_backends";>state-backends</a>
      * @return This StreamExecutionEnvironment itself, to allow chaining of 
function calls.
      * @see #getStateBackend()
      * @see CheckpointConfig#setCheckpointStorage( 
org.apache.flink.runtime.state.CheckpointStorage)
      */
+    @Deprecated
     @PublicEvolving
     public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
         this.defaultStateBackend = Preconditions.checkNotNull(backend);
@@ -678,8 +699,16 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
     /**
      * Gets the state backend that defines how to store and checkpoint state.
      *
+     * @deprecated The method is marked as deprecated because starting from 
Flink 1.19, the usage of
+     *     all complex Java objects related to configuration, including their 
getter and setter
+     *     methods, should be replaced by ConfigOption. In a future major 
version of Flink, this
+     *     method will be removed entirely. It is recommended to find which 
state backend is used by
+     *     state backend ConfigOption. For more details on using ConfigOption 
for state backend
+     *     configuration, please refer to the Flink documentation: <a
+     *     
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/state/state_backends";>state-backends</a>
      * @see #setStateBackend(StateBackend)
      */
+    @Deprecated
     @PublicEvolving
     public StateBackend getStateBackend() {
         return defaultStateBackend;

Reply via email to