shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1275550987


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);

Review Comment:
   Confirmed that calling it twice is safe in our Impl of BlobStoreManager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to