[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718365#comment-16718365 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-446434187 @azagrebin Thank you for your explanation. As your explanation, I will - first move `DirectExecutorService` into `org.apache.flink.runtime.concurrent` and change the `Executor Executors#directExecutor()` to `DirectExecutorService Executors#direcExecutorService()`, this will be done in a separate commit. - then use `Executors#direcExecutorSerivice()` in current patch to share the logic of `threadNum = 1` and `threadNum > 1`. Did I understand right? If yes, I will file a issue and implement the first move, and then come back to complete this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716776#comment-16716776 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-446148631 @klion26 right, sorry for confusion. I think we should move `DirectExecutorService` into `org.apache.flink.runtime.concurrent`. We have already there `Executors.directExecutor()`. This could be deduplicated. `directExecutor()` could become `directExecutorService()` and return `DirectExecutorService` singleton as `ExecutorService` instead of `Executor`. Then `DirectExecutor` is not needed in `Executors`. Could you do it as a separate commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716503#comment-16716503 ] ASF GitHub Bot commented on FLINK-10461: klion26 edited a comment on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-446110193 @azagrebin Thank you for your reply, For unifing the `threadNum > 1` and `threadNum = 1` in `RocksDbStateDataTransfer` , we should use `DirectExecutorService` for `threadNum = 1`, But I found `DirectExecutorService` locates in `flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` such as I commented above. For this, I have a question, did I miss something? or should I move `flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` to `flink-runtime/src/main/java/org/apache/runtime/util/DirectExecutorService` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716500#comment-16716500 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-446110193 @azagrebin Hi, For unifing the `threadNum > 1` and `threadNum = 1`, we should use `DirectExecutorService` for `threadNum = 1`, But I found `DirectExecutorService` locates in `flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` such as I commented above. For this, I have a question, did I miss something? or should I move `flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService` to `flink-runtime/src/main/java/org/apache/runtime/util/DirectExecutorService` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714724#comment-16714724 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-445814814 @klion26, do you have ETA for addressing the comments? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709534#comment-16709534 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238908481 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } Review comment: I agree. I found `DirectExecutorService` in test code(`flink-runtime/src/test/java/org/apache/runtime/util/DirectExecutorService`), Do I miss something? or should I move it to `flink-runtime/src/main/java/org/apache/runtime/util/DirectExecutorService` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Majo
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708881#comment-16708881 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238712692 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExcepti
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708879#comment-16708879 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238708730 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExcepti
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708876#comment-16708876 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238710301 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } Review comment: I agree, we will have less code by just using the `DirectExecutorService` instead of looping Runnable's. Then code in `runTransferWithMultipleThreads` could go back to `downloadDataForAllStateHandles` and `runTransferWithSingleThread` would not be needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Pr
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708869#comment-16708869 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238708730 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExcepti
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708853#comment-16708853 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238362343 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExcepti
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708855#comment-16708855 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238702567 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExce
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708854#comment-16708854 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238702953 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (isMultiThreadDownloadEnabled(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + + runnables.add(ThrowingRunnable.unchecked(new ThrowingRunnable() { + @Override + public void run() throws IOException { + downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry); + } + })); +
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708856#comment-16708856 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238701173 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -686,6 +697,17 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() { return options; } + /** +* Gets the thread number will used for downloading files from DFS when restore. +*/ + public int getNumberOfRestoringThreads() { + return restoringThreadNum == -1 ? RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : restoringThreadNum; Review comment: Can we also rename the field that it matches the getter? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708834#comment-16708834 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238700871 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } Review comment: `DirectExecutorService` would execute everything in the calling thread. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could s
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708579#comment-16708579 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238623643 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } Review comment: In my opinion, if `restoringThreadNum == 1`, we should run the `runnables` in current thread other than in the executorService, which is the current behavior. What do you think about it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current m
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707559#comment-16707559 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238362343 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExcepti
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707448#comment-16707448 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238329439 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } Review comment: Wouldn't it be better to first instantiate the `ExecutorService` and then use it to download the files. ``` ExecutorService executorService = createExecutorService(restoringThreadNum); for (Runnable runnable: createDownloadRunnables()) { executorService.execute(runnable); } ``` if `restoringThreadNum == 1`, then we create a direct executor service. That way we would not need to duplicate the logic in `runTransferWithSingleThread`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-1046
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707450#comment-16707450 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238328094 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; Review comment: Is it possible to write this test without mocking? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707451#comment-16707451 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238330014 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExce
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707446#comment-16707446 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238324894 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { Review comment: Inconsistent code style in this file. Closing bracket should be on the previous line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. >
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707444#comment-16707444 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238320078 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for RocksDbStateDataTransfer. + */ +public class RocksDBStateDataTransferTest { Review comment: Should extend `TestLogger` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707445#comment-16707445 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238325757 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { Review comment: Inconsistent code style This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major >
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707447#comment-16707447 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238326175 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExce
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707452#comment-16707452 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238318368 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -686,6 +697,17 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() { return options; } + /** +* Gets the thread number will used for downloading files from DFS when restore. +*/ + public int getrestoringThreadNum() { Review comment: getter and setter should be `getRestoringThreadNum` better `getNumberOfRestoringThreads` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707449#comment-16707449 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238331599 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExce
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707453#comment-16707453 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238320013 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + + if (enableMultiThreadDownload(restoringThreadNum)) { + runTransferWithMultipleThreads(runnables, restoringThreadNum, closeableRegistry); + } else { + runTransferWithSingleThread(runnables); + } + } + + private static List createDownloadRunnables( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)); + } + return runnables; + } + + private static void runTransferWithMultipleThreads( + List runnables, + int threadNum, + CloseableRegistry closeableRegistry) throws ExecutionException, InterruptedException, IOExce
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707443#comment-16707443 ] ASF GitHub Bot commented on FLINK-10461: tillrohrmann commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r238324373 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { Review comment: Does this need to be public? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704758#comment-16704758 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-443209887 @azagrebin got it, thank you for the explanation, i've just remove the flag `running` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704680#comment-16704680 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-443193074 In general, I do not see any problem in having multiple threads, downloading even the same data. If your question is whether they can interfere with each other locally, the target directories are randomised so they should download to different locations, even if at the same time. Eventually, it is the JM job that only one of them will be used. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704659#comment-16704659 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-443188022 @azagrebin thank for your reply, I have a question about this, will JM failover lead to two download threads? If this is no, I'll change and remove the `running` flag, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704490#comment-16704490 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-443148880 Thanks for addressing the comment. One more thing, I think we do not actually need the `running` flag because we register streams in closable registry. In case of canceling the job, the runnables will either fail to register streams or will be interrupted because streams will be closed unexpectedly and break the while loop anyways with IO exception which is what we want. The while loops will not hold executor shutdown for long. Sorry for confusion. Can we still change it and remove the atomic `running`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704215#comment-16704215 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-443079619 Thanks a lot for your reviews, I learnt a lot from them. @azagrebin I fixed the java doc, and thank you again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703374#comment-16703374 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r237538899 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java ## @@ -45,4 +45,12 @@ .withDescription(String.format("This determines the factory for timer service state implementation. Options " + "are either %s (heap-based, default) or %s for an implementation based on RocksDB .", HEAP.name(), ROCKSDB.name())); + + /** +* The thread numbers used to download files from DFS in RocksDBStateBackend. Review comment: Also here without s: `The thread number used` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702571#comment-16702571 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-442662282 @azagrebin thanks for your reviewing, i'vs just addressed the comments, please help to review this when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701960#comment-16701960 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r237105565 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoredThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, restoredThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, restoredThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoredThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + AtomicBoolean running = new AtomicBoolean(true); + List runnables = getRunnablesForStateHandles(stateHandleMap, restoreInstancePath, closeableRegistry, running); + + if (enableMultiThreadDownload(restoredThreadNum)) { + runRunnalbesMultithread(runnables, restoredThreadNum, closeableRegistry, running); Review comment: I would rename like this: transferAllDataFromStateHandles -> downloadDataForAllStateHandles getRunnablesForStateHandles ->createDownloadRunnables runRunnalbesMultithread -> runTransferWithMultipleThreads runRunnablesSingleThread -> runTransferWithSingleThread copyStateDataHandleData -> downloadDataForStateHandle also I would rename everywhere: restoredThreadNum -> restoringThreadNum This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink >
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701961#comment-16701961 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r237104577 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoredThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, restoredThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, restoredThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoredThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + AtomicBoolean running = new AtomicBoolean(true); + List runnables = getRunnablesForStateHandles(stateHandleMap, restoreInstancePath, closeableRegistry, running); + + if (enableMultiThreadDownload(restoredThreadNum)) { + runRunnalbesMultithread(runnables, restoredThreadNum, closeableRegistry, running); + } else { + runRunnablesSingleThread(runnables); + } + } + + private static List getRunnablesForStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + CloseableRegistry closeableRegistry, + AtomicBoolean running + ) { + List runnables = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + Path path = new Path(restoreInstancePath, stateHandleID.toString()); + runnables.add(() -> copyStateDataHandleData(path, remoteFileHandle, closeableRegistry, running)); + } + return runnables; + } + + private static void runRunnalbesMultithread( +
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701950#comment-16701950 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236202672 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + private static volatile AtomicBoolean running = new AtomicBoolean(false); + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoredThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, restoredThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, restoredThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( Review comment: I think this method should be broken down into 3 methods: - loop which creates `List` - multi-threaded version where `executorService`, `running` and `shutDownAllTasksAndExecutorService` are used and Runnables are submitted in a loop with try/catch. - single-threaded version which just runs Runnables. If `restoredThreadNum` > 1 then multi-threaded version should be called in this method else single-threaded version. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will be
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701508#comment-16701508 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-442356732 @azagrebin thanks for your review, I've just addressed all your comments and created[ FLINK-11008](https://issues.apache.org/jira/browse/FLINK-11008) for follow up issue. Please help to review this when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698754#comment-16698754 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236199821 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -276,6 +312,12 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config) this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined( config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS)); + if (original.restoredThreadNum == CheckpointingOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue()) { Review comment: here we could initialise `restoredThreadNum` from `config` if `original.restoredThreadNum` is -1 (undefined). Otherwise `restoredThreadNum` = `original.restoredThreadNum`, similar to `original.enableIncrementalCheckpointing.resolveUndefined`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698755#comment-16698755 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236202672 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + private static volatile AtomicBoolean running = new AtomicBoolean(false); + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoredThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, restoredThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, restoredThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( Review comment: I think this methid should be broken down into 3 methods: - loop which creates `List` - multi-threaded version where `executorService`, `running` and `shutDownAllTasksAndExecutorService` are used and Runnables are submitted in a loop with try/catch. - single-threaded version which just runs Runnables. If `restoredThreadNum` > 1 then multi-threaded version should be called in this method else single-threaded version. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will be
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698756#comment-16698756 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236186387 ## File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ## @@ -115,4 +115,12 @@ .defaultValue(1024) .withDescription("The minimum size of state data files. All state chunks smaller than that are stored" + " inline in the root checkpoint metadata file."); + + /** +* The thread numbers used to download files from DFS in RocksDBStateBackend. +*/ + public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM = ConfigOptions + .key("state.checkpoint.restore.thread.num") + .defaultValue(-1) Review comment: I think default value 1 was reasonable before, that is what we have now. -1 looks rather as undefined value. I would also consider to change option prefix to contain `rocksdb` in it: `state.backend.rocksdb.checkpoint.restore.thread.num` and to move the option to `org.apache.flink.contrib.streaming.state.RocksDBOptions`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698751#comment-16698751 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236198988 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -236,8 +239,25 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) { * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. */ public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { + this(checkpointStreamBackend, enableIncrementalCheckpointing, CheckpointingOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue()); + } + + /** +* Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its +* checkpoint data streams. Typically, one would supply a filesystem or database state backend +* here where the snapshots from RocksDB would be stored. +* +* The snapshots of the RocksDB state will be stored using the given backend's +* {@link StateBackend#createCheckpointStorage(JobID)}. +* +* @param checkpointStreamBackend The backend write the checkpoint streams to. +* @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. +* @param restoredThreadNum thread num used to download files from DFS when restore. +*/ + public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing, int restoredThreadNum) { this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + this.restoredThreadNum = restoredThreadNum; Review comment: Actually, instead of changing and exploding the constructor signatures, I would suggest to set it here to -1 (undefined). Then we could remove `final` from `restoredThreadNum` and add setter method for it, like for `predefinedOptions`. Users could use the setter to set this option per job. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698753#comment-16698753 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236203718 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + private static volatile AtomicBoolean running = new AtomicBoolean(false); + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoredThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, restoredThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, restoredThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoredThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = enableMultiThreadDownload(restoredThreadNum) ? + Executors.newFixedThreadPool(restoredThreadNum) : + null; + + running.set(true); Review comment: `AtomicBoolean running = new AtomicBoolean(true);` can be local variable passed to `copyStateDataHandleData` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by usi
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698752#comment-16698752 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r236200642 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -686,6 +729,13 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() { return options; } + /** +* Gets the thread number will used for downloading files from DFS when restore. +*/ + public int getRestoredThreadNum() { + return restoredThreadNum; Review comment: here we could return `restoredThreadNum` if it is not -1 (defined) and `CheckpointingOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue()` if `restoredThreadNum` is still -1 (undefined). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692689#comment-16692689 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-440143611 Hi, @azagrebin , please help to review this when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674635#comment-16674635 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-435747811 Hi, @azagrebin , I've just addressed all comments, please help to review this when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667354#comment-16667354 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228937223 ## File path: docs/_includes/generated/checkpointing_configuration.html ## @@ -32,6 +32,11 @@ false + +state.checkpoint.restore.thread.num +1 +The thread numbers used to download files from DFS in RocksDBStateBackend. Review comment: I think `The thread number` should be singular without `s`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667359#comment-16667359 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228970609 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, resotreThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, resotreThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int resotreThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = Executors.newFixedThreadPool(resotreThreadNum); + List> futures = new LinkedList<>(); + List closeables = new LinkedList<>(); + + try { + closeables.add(() -> executorService.shutdownNow()); + closeableRegistry.registerCloseable(((LinkedList) closeables).getLast()); + + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + if (resotreThreadNum > 1) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle, closeableRegistry); + } catch (IOException e) { Review comment: the exception handling can go inside `copyStateDataHandleData`. `new Path(restoreInstancePath, stateHandleID.toString())` can be a variable to reduce the line length. I would create a list of `Runnable`'s: ``` Path path = new Path(restoreInstancePath, stateHandleID.toString()); runnables.add( -> copyStateD
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667357#comment-16667357 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228937420 ## File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ## @@ -115,4 +115,12 @@ .defaultValue(1024) .withDescription("The minimum size of state data files. All state chunks smaller than that are stored" + " inline in the root checkpoint metadata file."); + + /** +* The thread numbers used to download files from DFS in RocksDBStateBackend. +*/ + public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM = ConfigOptions + .key("state.checkpoint.restore.thread.num") + .defaultValue(1) + .withDescription("The thread numbers used to download files from DFS in RocksDBStateBackend."); Review comment: also here: `The thread number` without `s`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667360#comment-16667360 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228970266 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, resotreThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, resotreThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int resotreThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = Executors.newFixedThreadPool(resotreThreadNum); + List> futures = new LinkedList<>(); + List closeables = new LinkedList<>(); + + try { + closeables.add(() -> executorService.shutdownNow()); Review comment: I think the tasks should be first properly canceled and then `executorService` should shutdown. `executorService.shutdownNow` also calls `future.cancel` which is interrupting the task thread. At the moment `while (true)` in `copyStateDataHandleData` does not support explicit cancelation. I suggest to create an `running = AtomicBoolean(true)` in the beginning then `while (true)` can be `while(running.get())`. This will allow to cancel all tasks at once and shutdown `executorService` in one `closeable`: `closeables.add(() -> { running.set(false); executorService.shutdownNow()); };` List of `closeables` is not needed then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issue
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667355#comment-16667355 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228976096 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for RocksDbStateDataTransfer. + */ +public class RocksDBStateDataTransferTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testThreadPoolExceptionRethrow() throws Exception { + IncrementalKeyedStateHandle stateHandle = mock(IncrementalKeyedStateHandle.class); + + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread restore."); + StreamStateHandle mockStateHandle = mock(StreamStateHandle.class); + when(mockStateHandle.openInputStream()).thenThrow(expectedException); + + Map sharedStateHandle = new HashMap<>(1); + sharedStateHandle.put(new StateHandleID("mock"), mockStateHandle); + when(stateHandle.getSharedState()).thenReturn(sharedStateHandle); + + try { + RocksDbStateDataTransfer.transferAllStateDataToDirectory(stateHandle, new Path(temporaryFolder.newFolder().toURI()), 5, new CloseableRegistry()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e.getCause().getCause()); + } + } + + /** +* Tests that download files with multi-thread correctly. +* @throws Exception +*/ + @Test + public void testMultiThreadRestoreCorrectly() throws Exception { + IncrementalKeyedStateHandle stateHandle = mock(IncrementalKeyedStateHandle.class); + + byte[] content1 = new byte[1]; Review comment: I would create a list of something like 6 test contexts and use loops over them for further actions. content length can be also random. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major >
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667356#comment-16667356 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228964472 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -235,9 +238,10 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) { * @param checkpointStreamBackend The backend write the checkpoint streams to. * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { + public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing, int restoreThreadNum) { Review comment: I would also leave the previous method without restoreThreadNum: ``` public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { this(checkpointStreamBackend, enableIncrementalCheckpointing, -1); } ``` this way we do not break existing code, including other methods here, like this modification which is not needed then and other similar ones: ``` public RocksDBStateBackend(StateBackend checkpointStreamBackend) { this(checkpointStreamBackend, TernaryBoolean.UNDEFINED, 1); } ``` and I would rather keep it undefined here, e.g. `-1`, similar to `TernaryBoolean.UNDEFINED` for `enableIncrementalCheckpointing`. It can be resolved then in `private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)` and `getRestoreThreadNum`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667358#comment-16667358 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228967531 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, Review comment: typo: `resotreThreadNum` -> `restoredThreadNum`, also in other methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657653#comment-16657653 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-431539541 Hi, @azagrebin , could you please help to review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646246#comment-16646246 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-428903419 @azagrebin I've updated the PR based on your comments, Please help reviewing it when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641854#comment-16641854 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-427837454 Thank you for you review, @azagrebin ! I will push another commit to address your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639908#comment-16639908 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r223030850 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles( Map stateHandleMap, Path restoreInstancePath) throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(5); + List> tasks = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { StateHandleID stateHandleID = entry.getKey(); StreamStateHandle remoteFileHandle = entry.getValue(); - copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); + + FutureTask task = new FutureTask<>(() -> { + try { + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); + } catch (IOException e) { + LOG.error("Copy State Data error, stateHandleID [{}], remoteFileHandle[{}]", stateHandleID.toString(), remoteFileHandle); + throw e; + } + return null; + }); + tasks.add(task); + executorService.submit(task); Review comment: I suggest to use `CompletableFuture.runAsync(Runnable, Executor)` instead of `FutureTask`. It allows to use `FutureUtils.waitForAll(CompletableFutures).get()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639916#comment-16639916 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r223029260 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles( Map stateHandleMap, Path restoreInstancePath) throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(5); Review comment: Number of threads should be configurable. For example, the same way as incremental checkpointing. At least in RocksDBStateBackend constructor and CheckpointingOptions. The default behaviour (e.g. number of threads 1) can be as it is now, basically running in current thread. We also can create a follow up issue to reconsider rocksdb backend configuration later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639912#comment-16639912 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r223032334 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles( Map stateHandleMap, Path restoreInstancePath) throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(5); Review comment: Cancelation of `copyStateDataHandleData` futures and `executorService.shutdownNow()` should be registered in `cancelStreamRegistry.registerCloseable()` and unregistered with `unregisterCloseable` before `executorService.shutdown()` in `finally`. Similar to streams in `copyStateDataHandleData`. This way we guarantee freeing of all resources in case of job abrupt shutdown. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639911#comment-16639911 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r223034163 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for RocksDBKeyedStateBackend. + */ +public class RocksDBKeyedStateBackendTest { Review comment: In addition to failure scenario, I would also then add a test which mocks `StreamStateHandle` streams and checks that all streams are read and written correctly in parallel. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639909#comment-16639909 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r223030317 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles( Map stateHandleMap, Path restoreInstancePath) throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(5); + List> tasks = new ArrayList<>(stateHandleMap.size()); + for (Map.Entry entry : stateHandleMap.entrySet()) { StateHandleID stateHandleID = entry.getKey(); StreamStateHandle remoteFileHandle = entry.getValue(); - copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); + + FutureTask task = new FutureTask<>(() -> { Review comment: I think it would be more maintainable if we extract these 3 methods: - `transferAllStateDataToDirectory` - `transferAllDataFromStateHandles` - `copyStateDataHandleData` into a separate class file, e.g. `RocksDbStateDataTransfer`. `RocksDBKeyedStateBackend` already contains a lot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639910#comment-16639910 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r223029260 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1286,10 +1289,38 @@ private void transferAllDataFromStateHandles( Map stateHandleMap, Path restoreInstancePath) throws IOException { + ExecutorService executorService = Executors.newFixedThreadPool(5); Review comment: Number of threads should be configurable. For example, the same way as incremental checkpointing. At least in RocksDBStateBackend constructor and CheckpointingOptions. The default behaviour (e.g. number of threads 1) can be as it is now, basically running in current thread. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632738#comment-16632738 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-425609212 cc @StefanRRichter I use 5 threads for downloading files, because I think 5 is enough for a lot of scenarios, and from the experience of my company using 5 threads looks good. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632735#comment-16632735 ] ASF GitHub Bot commented on FLINK-10461: klion26 opened a new pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777 ## What is the purpose of the change This PR speed up the download files when restore from DFS using multi-thread. ## Brief change log Use multi-thread from download files when restore from DFS. ## Verifying this change Add a UT for test exception rethrow in thread pool ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no) - If yes, how is the feature documented? (**not applicable**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)