prateekm commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1288927853


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+              forceRestoreTasks.add(taskName.getTaskName());
+            } else {
+              // log and rethrow exception to communicate restore failure
+              String msg = String.format("init failed for task: %s with 
GetDeleted set to true", taskName);
+              LOG.error(msg, ex);
+              throw new SamzaException(msg, ex);
+            }
+          }
+        })
+    );
+
+    // Start each store consumer once.
+    // Note: These consumers are per system and only changelog system store 
consumers will be started.
+    // Some TaskRestoreManagers may not require the consumer to be started, 
but due to the agnostic nature of
+    // ContainerStorageManager we always start the changelog consumer here in 
case it is required
+    storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+    return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints, 
taskBackendFactoryToStoreNames, jobContext,
+        containerModel, samzaContainerMetrics, checkpointManager, config, 
taskInstanceMetrics, executor, loggerStoreDir,
+        forceRestoreTasks);
+  }
+
+  /**
+   * Restores all TaskInstances and returns a future for each TaskInstance 
restore. If this restore fails with DeletedException
+   * it will retry the restore with getDeleted flag, get all the blobs, 
recreate a new checkpoint in blob store and
+   * write that checkpoint to checkpoint topic.
+   * @param taskRestoreManagers map of TaskName to factory name to 
TaskRestoreManager map.
+   */
+  public static List<Future<Void>> restoreAllTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
Map<TaskName, Checkpoint> taskCheckpoints,
+      Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, 
JobContext jobContext,
+      ContainerModel containerModel, SamzaContainerMetrics 
samzaContainerMetrics, CheckpointManager checkpointManager,
+      Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
ExecutorService executor,
+      File loggedStoreDir, Set<String> forceRestoreTask) {
+
+    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+    // Submit restore callable for each taskInstance
+    taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+      // Submit for each restore factory
+      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+        long startTime = System.currentTimeMillis();
+        String taskName = taskInstanceName.getTaskName();
+        LOG.info("Starting restore for state for task: {}", taskName);
+
+        CompletableFuture<Void> restoreFuture;
+        if (forceRestoreTask.contains(taskName) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+          restoreFuture = ((BlobStoreRestoreManager) 
taskRestoreManager).restore(true);
+        } else {
+          restoreFuture = taskRestoreManager.restore();
+        }
+
+        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+          updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+
+          if (ex != null) {
+            if (isUnwrappedExceptionDeletedException(ex)) {
+              LOG.warn(
+                  "Restore state for task: {} received DeletedException. 
Reattempting with getDeletedBlobs enabled",
+                  taskInstanceName.getTaskName());
+
+              // Try to restore with getDeleted flag
+              CompletableFuture<Void> future =
+                  
restoreTaskStoresAndCreateCheckpointWithGetDeleted(taskInstanceName, 
taskCheckpoints, checkpointManager,

Review Comment:
   LGTM, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to