shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1286390522


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -301,64 +300,14 @@ private void restoreStores() throws InterruptedException {
               samzaContainerMetrics, taskInstanceMetrics, 
taskInstanceCollectors, serdes,
               loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, 
clock);
       taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+      taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
     });
 
-    // Initialize each TaskStorageManager
-    taskRestoreManagers.forEach((taskName, restoreManagers) ->
-        restoreManagers.forEach((factoryName, taskRestoreManager) ->
-            taskRestoreManager.init(taskCheckpoints.get(taskName))
-        )
-    );
-
-    // Start each store consumer once.
-    // Note: These consumers are per system and only changelog system store 
consumers will be started.
-    // Some TaskRestoreManagers may not require the consumer to to be started, 
but due to the agnostic nature of
-    // ContainerStorageManager we always start the changelog consumer here in 
case it is required
-    
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
-    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
-    // Submit restore callable for each taskInstance
-    taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
-      // Submit for each restore factory
-      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
-        long startTime = System.currentTimeMillis();
-        String taskName = taskInstance.getTaskName();
-        LOG.info("Starting restore for state for task: {}", taskName);
-        CompletableFuture<Void> restoreFuture = 
taskRestoreManager.restore().handle((res, ex) -> {
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore",
-                taskName, ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be be able to 
continue processing/backups
-            // if restore manager close fails.
-          }
-
-          long timeToRestore = System.currentTimeMillis() - startTime;
-          if (samzaContainerMetrics != null) {
-            Gauge taskGauge = 
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, 
null);
-
-            if (taskGauge != null) {
-              taskGauge.set(timeToRestore);
-            }
-          }
-
-          if (ex != null) {
-            // log and rethrow exception to communicate restore failure
-            String msg = String.format("Error restoring state for task: %s", 
taskName);
-            LOG.error(msg, ex);
-            throw new SamzaException(msg, ex); // wrap in unchecked exception 
to throw from lambda
-          } else {
-            return null;
-          }
-        });
-
-        taskRestoreFutures.add(restoreFuture);
-      });
-    });
+    // Init all taskRestores and if successful, create a future for restores 
for each task
+    List<Future<Void>> taskRestoreFutures =

Review Comment:
   Updated to now return a map of task name and checkpoints rather than 
mutating the input 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to