prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r614965895
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -256,6 +254,10 @@ public ContainerStorageManager(
containerChangelogSystems, systemFactories, config,
this.samzaContainerMetrics.registry());
this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams,
storeSystemConsumers);
+ // TODO HIGH dchen tune based on observed concurrency
+ this.restoreExecutor =
Executors.newFixedThreadPool(containerModel.getTasks().size(),
Review comment:
@dxichen I think this still needs to be 2x until you make RestoreManager
init() and restore() nonblocking.
Where is restore manager init (not store init) called btw? Is this on this
executor as well? Since we're deleting old stores during restore init, ideally
that should be parallel and nonblocking as well.
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -643,15 +652,11 @@ private void restoreStores() throws InterruptedException {
// Start each store consumer once
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
- // Create a thread pool for parallel restores (and stopping of persistent
stores)
- ExecutorService executorService =
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
- new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
List<Future> taskRestoreFutures = new
ArrayList<>(this.taskRestoreManagers.entrySet().size());
// Submit restore callable for each taskInstance
this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
- taskRestoreFutures.add(executorService.submit(
+ taskRestoreFutures.add(restoreExecutor.submit(
Review comment:
What's the relationship b/w the parallelRestoreThreadPoolSize and new
executors pool size? Are they the same?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]