azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250324277
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##########
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+       public RocksDBStateDownloader(int restoringThreadNum) {
+               super(restoringThreadNum);
+       }
 
-       static void transferAllStateDataToDirectory(
+       /**
+        * Transfer all state data to the target directory using specified 
number of threads.
+        *
+        * @param restoreStateHandle Handles used to retrieve the state data.
+        * @param dest The target directory which the state data will be stored.
+        * @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+        *
+        * @throws Exception Thrown if can not transfer all the state data.
+        */
+       public void transferAllStateDataToDirectory(
                IncrementalKeyedStateHandle restoreStateHandle,
                Path dest,
-               int restoringThreadNum,
                CloseableRegistry closeableRegistry) throws Exception {
 
                final Map<StateHandleID, StreamStateHandle> sstFiles =
                        restoreStateHandle.getSharedState();
                final Map<StateHandleID, StreamStateHandle> miscFiles =
                        restoreStateHandle.getPrivateState();
 
-               downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-               downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+               downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   could `closeableRegistry` be shared the same way as `executorService` in 
`RocksDBStateDataTransfer`? Then we do not need to pass to all class methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to