zoltar9264 commented on code in PR #20152:
URL: https://github.com/apache/flink/pull/20152#discussion_r930584225


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java:
##########
@@ -87,9 +87,14 @@ protected <K> CheckpointableKeyedStateBackend<K> restore(
         String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
         ExecutionConfig executionConfig = env.getExecutionConfig();
 
+        env.getAsyncOperationsThreadPool();
+
         ChangelogStateFactory changelogStateFactory = new 
ChangelogStateFactory();
         CheckpointableKeyedStateBackend<K> keyedStateBackend =
                 ChangelogBackendRestoreOperation.restore(
+                        env.getJobID(),
+                        env.getAsyncOperationsThreadPool(),
+                        env.getTaskManagerInfo().getConfiguration(),

Review Comment:
   Hi @fredia , thanks for reply. I'm not suggest pass 
PERIODIC_MATERIALIZATION_INTERVAL directly. StateChangelogStorage may have 
different implementations, each one has different  options. I think an 
implementation-specific configuration should not be exposed in the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to