Myasuka commented on a change in pull request #16153:
URL: https://github.com/apache/flink/pull/16153#discussion_r652468740



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -327,6 +331,7 @@ private static StateBackend 
loadFromApplicationOrConfigOrDefaultInternal(
      */
     public static StateBackend fromApplicationOrConfigOrDefault(

Review comment:
       We should add logs to tell that whether changelog state backend is 
enabled.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -607,6 +625,65 @@ public StateBackend getStateBackend() {
         return defaultStateBackend;
     }
 
+    /**
+     * Enable the change log for current state backend. this changelog allows 
operators to persist
+     * state changes in a very fine-grained manner, as described below:
+     *
+     * <p>Stateful operators write the state changes to that log (logging the 
state), in addition to
+     * applying them to the state tables in RocksDB or the in-mem Hashtable.
+     *
+     * <p>An operator can acknowledge a checkpoint as soon as the changes in 
the log have reached
+     * the durable checkpoint storage.
+     *
+     * <p>The state tables are persisted periodically, independent of the 
checkpoints. We call this
+     * the materialization of the state on the checkpoint storage.
+     *
+     * <p>Once the state is materialized on checkpoint storage, the state 
changelog can be truncated
+     * to the corresponding point.
+     *
+     * <p>It establish a way to drastically reduce the checkpoint interval for 
streaming
+     * applications across state backends. For more details please check the 
FLIP-158.
+     *
+     * <p>If this method is not called explicitly, it means no preference for 
enabling the change
+     * log. Configs for change log enabling will override in different config 
levels
+     * (job/local/cluster).
+     *
+     * @param enabled true if enable the change log for state backend 
explicitly, otherwise disable
+     *     the change log.
+     * @return This StreamExecutionEnvironment itself, to allow chaining of 
function calls.
+     * @see #isChangelogStateBackendEnabled()
+     */
+    @PublicEvolving
+    public StreamExecutionEnvironment enableChangelogStateBackend(boolean 
enabled) {
+        if (enabled) {
+            try {
+                this.defaultStateBackend =
+                        StateBackendLoader.loadChangelogStateBackendIfNeeded(

Review comment:
       I don't think we should load state backend once we enable changelog 
state-backend as client-side might have different classloader with 
cluster-runtime-side. Just leave the original internal state backend and load 
it once running on cluster.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -327,6 +331,7 @@ private static StateBackend 
loadFromApplicationOrConfigOrDefaultInternal(
      */
     public static StateBackend fromApplicationOrConfigOrDefault(
             @Nullable StateBackend fromApplication,
+            @Nullable TernaryBoolean 
isChangeLogStateBackendEnableFromApplication,

Review comment:
       If `isChangeLogStateBackendEnableFromApplication` is `null`, it actually 
has the meaning as `unset` (mainly on stream task side). However, 
`TernaryBoolean` itself is designed to be able to present the `unset` meaning. 
I think here existed mixed semantics and I suggest to remove the possibility of 
`Nullable`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -336,8 +341,18 @@ public static StateBackend 
fromApplicationOrConfigOrDefault(
                 loadFromApplicationOrConfigOrDefaultInternal(
                         fromApplication, config, classLoader, logger);
 
-        if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG)
-                && !(fromApplication instanceof DelegatingStateBackend)) {
+        if (isChangeLogStateBackendEnableFromApplication == null) {
+            isChangeLogStateBackendEnableFromApplication = 
TernaryBoolean.UNDEFINED;
+        }
+        Optional<Boolean> enableConfig =
+                
config.getOptional(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG);

Review comment:
       Why not use `Boolean enabledFromConfig = 
config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG);` directly?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendLoadingTest.java
##########
@@ -183,16 +218,83 @@ public void testLoadingRocksDBStateBackend() throws 
Exception {
                 false);
     }
 
-    private Configuration config(String stateBackend) {
-        final Configuration config = config();
+    @Test
+    public void testConfigureEnvByConfiguration() throws Exception {

Review comment:
       If we decide to remove loading state backend in streaming execution 
environment, the methods of `testConfigureEnvByConfiguration` and 
`testEnableChangelogStateBackendInStreamExecutionEnvironment` are useless.

##########
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
##########
@@ -214,6 +215,8 @@ StreamConfig getConfig(
         config.setOperatorName(operatorID.toHexString());
         config.setOperatorID(operatorID);
         config.setStateBackend(stateBackend);
+        // This means leave this stateBackend unwrapped.

Review comment:
       ```suggestion
           // This means leaving this stateBackend unwrapped.
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
##########
@@ -73,6 +75,7 @@ public static void setBatchProperties(StreamGraph 
streamGraph, TableConfig table
         // scheduler)
         streamGraph.setJobType(JobType.BATCH);
         streamGraph.setStateBackend(null);
+        streamGraph.setChangeLogStateBackendEnabled(TernaryBoolean.UNDEFINED);

Review comment:
       If no state-backend is needed for batch jobs, should we set set enable 
changlog as `TernaryBoolean.FALSE`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -593,7 +598,20 @@ public CheckpointingMode getCheckpointingMode() {
      */
     @PublicEvolving
     public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
-        this.defaultStateBackend = Preconditions.checkNotNull(backend);
+        Preconditions.checkNotNull(backend);
+        if (changelogStateBackendEnabled.equals(TernaryBoolean.TRUE)) {
+            try {
+                this.defaultStateBackend =

Review comment:
       I don't think we should load the changelog state backend here, we could 
still keep the original internal state backend and load the real one within 
`StreamGraphGenerator` or `StreamTask`.
   Moreover, if so, there is no need to introduce 
`StateBackendLoader#loadChangelogStateBackendIfNeeded` and 
`StateBackendLoader#unwrapFromDelegatingStateBackend`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to