prateekm commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1273936574


##########
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java:
##########
@@ -93,14 +93,18 @@ public TaskStorageCommitManager(TaskName taskName, 
Map<String, TaskBackupManager
     this.metrics = metrics;
   }
 
-  public void init() {
+  public void init(Checkpoint checkpoint) {
     // Assuming that container storage manager has already started and created 
to stores
     storageEngines = containerStorageManager.getAllStores(taskName);
     if (checkpointManager != null) {
-      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      // In case of standby containers

Review Comment:
   Minor: conditions can be simplified. checkpointManager != null only needs to 
be checked if checkpoint == null.
   Also prefer not reassigning input params, assign to finalCheckpoint instead.



##########
samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java:
##########
@@ -49,11 +49,12 @@ public interface BlobStoreManager {
    * @param id Blob ID of the blob to get
    * @param outputStream OutputStream to write the downloaded blob
    * @param metadata User supplied {@link Metadata} of the request
+   * @param getDeletedBlob Flag to indicate if get should try to get a blob 
marked for deletion but not yet compacted
    * @return A future that completes when all the chunks are downloaded and 
written successfully to the OutputStream
    * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException 
returned future should complete
    *         exceptionally with DeletedException on failure with the blob 
already deleted error.
    */
-  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata 
metadata);
+  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata 
metadata, Boolean getDeletedBlob);

Review Comment:
   Minor: boolean, to avoid accidentally passing null values



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel, 
ExecutorService restoreExecu
 
   @Override
   public void init(Checkpoint checkpoint) {
+    init(checkpoint, false);
+  }
+
+  public void init(Checkpoint checkpoint, Boolean getDeletedBlob) {

Review Comment:
   Do we need both init variants / why do we need both to be public? 



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -153,6 +158,10 @@ public CompletableFuture<Void> restore() {
         storageConfig, metrics, storageManagerUtil, blobStoreUtil, 
dirDiffUtil, executor);
   }
 
+  public CompletableFuture<Void> restore(Boolean restoreDeleted) {

Review Comment:
   Add javadoc to public method. Why do we need both variants of restore?



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -162,15 +167,7 @@ public Map<String, Pair<String, SnapshotIndex>> 
getStoreSnapshotIndexes(
     }
 
     try {
-      return FutureUtil.toFutureOfMap(t -> {
-        Throwable unwrappedException = 
FutureUtil.unwrapExceptions(CompletionException.class, t);
-        if (unwrappedException instanceof DeletedException) {
-          LOG.warn("Ignoring already deleted snapshot index for taskName: {}", 
taskName, t);
-          return true;
-        } else {
-          return false;
-        }
-      }, storeSnapshotIndexFutures).join();
+      return FutureUtil.toFutureOfMap(storeSnapshotIndexFutures).join();

Review Comment:
   Not obvious to me: can you explain the difference in error handling behavior 
before / after? What does the predicate param to toFutureOfMap do?



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -329,11 +340,11 @@ static boolean shouldRestore(String taskName, String 
storeName, DirIndex dirInde
   @VisibleForTesting
   static void enqueueRestore(String jobName, String jobId, String taskName, 
String storeName, File storeDir, DirIndex dirIndex,
       long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures, 
BlobStoreUtil blobStoreUtil,
-      DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, 
ExecutorService executor) {
+      DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, 
ExecutorService executor, Boolean getDeleted) {

Review Comment:
   boolean



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel, 
ExecutorService restoreExecu
 
   @Override
   public void init(Checkpoint checkpoint) {
+    init(checkpoint, false);

Review Comment:
   Why false? Document inline.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -232,6 +288,15 @@ public CompletionStage<Void> 
deleteSnapshotIndexBlob(String snapshotIndexBlobId,
    * @return A future that completes when all the async downloads completes
    */
   public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex, 
Metadata metadata) {
+    return restoreDir(baseDir, dirIndex, metadata, false);
+  }
+
+  /**
+   * Non-blocking restore of a {@link SnapshotIndex} to local store by 
downloading all the files and sub-dirs associated
+   * with this remote snapshot. getDeletedFiles flag sets whether to attempt a 
get for deletedFiles or not.

Review Comment:
   Minor: update doc for getDeletedFile flag (explain why / when to use, not 
"getDeletedFiles gets deleted files" etc.)
   



##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -981,7 +981,7 @@ class SamzaContainer(
     }
   }
 
-  def startStores {
+  def startStores: util.Map[TaskName, Checkpoint] = {

Review Comment:
   Add documentation for what the return value represents.



##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -993,10 +993,14 @@ class SamzaContainer(
     })
   }
 
-  def startTask {
+  def startTask(taskCheckpoints: util.Map[TaskName, Checkpoint]) {

Review Comment:
   Add documentation here and later on for what the param represents.
   
   Esp. clarify relationship to startpoints.



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {
     initCaughtUpMapping()
 
+    val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
     if (commitManager != null) {
       debug("Starting commit manager for taskName: %s" format taskName)
-
-      commitManager.init()
+      commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)

Review Comment:
   Minor: Prefer not inlining expresions as params. Assign explicitly.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -208,9 +205,9 @@ public ContainerStorageManager(
   }
 
 
-  public void start() throws SamzaException, InterruptedException {
+  public Map<TaskName, Checkpoint> start() throws SamzaException, 
InterruptedException {

Review Comment:
   Document return value.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -153,6 +158,10 @@ public CompletableFuture<Void> restore() {
         storageConfig, metrics, storageManagerUtil, blobStoreUtil, 
dirDiffUtil, executor);
   }
 
+  public CompletableFuture<Void> restore(Boolean restoreDeleted) {
+    return restoreStores(jobName, jobId, taskModel.getTaskName(), 
storesToRestore, prevStoreSnapshotIndexes,
+        loggedBaseDir, storageConfig, metrics, storageManagerUtil, 
blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
+  }

Review Comment:
   Newline after.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -301,64 +300,14 @@ private void restoreStores() throws InterruptedException {
               samzaContainerMetrics, taskInstanceMetrics, 
taskInstanceCollectors, serdes,
               loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, 
clock);
       taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+      taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
     });
 
-    // Initialize each TaskStorageManager
-    taskRestoreManagers.forEach((taskName, restoreManagers) ->
-        restoreManagers.forEach((factoryName, taskRestoreManager) ->
-            taskRestoreManager.init(taskCheckpoints.get(taskName))
-        )
-    );
-
-    // Start each store consumer once.
-    // Note: These consumers are per system and only changelog system store 
consumers will be started.
-    // Some TaskRestoreManagers may not require the consumer to to be started, 
but due to the agnostic nature of
-    // ContainerStorageManager we always start the changelog consumer here in 
case it is required
-    
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
-    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
-    // Submit restore callable for each taskInstance
-    taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
-      // Submit for each restore factory
-      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
-        long startTime = System.currentTimeMillis();
-        String taskName = taskInstance.getTaskName();
-        LOG.info("Starting restore for state for task: {}", taskName);
-        CompletableFuture<Void> restoreFuture = 
taskRestoreManager.restore().handle((res, ex) -> {
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore",
-                taskName, ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be be able to 
continue processing/backups
-            // if restore manager close fails.
-          }
-
-          long timeToRestore = System.currentTimeMillis() - startTime;
-          if (samzaContainerMetrics != null) {
-            Gauge taskGauge = 
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, 
null);
-
-            if (taskGauge != null) {
-              taskGauge.set(timeToRestore);
-            }
-          }
-
-          if (ex != null) {
-            // log and rethrow exception to communicate restore failure
-            String msg = String.format("Error restoring state for task: %s", 
taskName);
-            LOG.error(msg, ex);
-            throw new SamzaException(msg, ex); // wrap in unchecked exception 
to throw from lambda
-          } else {
-            return null;
-          }
-        });
-
-        taskRestoreFutures.add(restoreFuture);
-      });
-    });
+    // Init all taskRestores and if successful, create a future for restores 
for each task

Review Comment:
   "Init all *tasks stores*, and if successful, (concurrently?) restore each 
task store".
   
   Minor: Generally should not say "create a future for" since Future is just 
the return type, not the main action. Same reason you'd say "get members" and 
not "create a list for members".



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -212,9 +213,17 @@ static CompletableFuture<Void> restoreStores(String 
jobName, String jobId, TaskN
       File loggedBaseDir, StorageConfig storageConfig, 
BlobStoreRestoreManagerMetrics metrics,
       StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, 
DirDiffUtil dirDiffUtil,
       ExecutorService executor) {
+    return restoreStores(jobName, jobId, taskName, storesToRestore, 
prevStoreSnapshotIndexes, loggedBaseDir, storageConfig,
+        metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, 
false);
+  }
+
+  public static CompletableFuture<Void> restoreStores(String jobName, String 
jobId, TaskName taskName, Set<String> storesToRestore,

Review Comment:
   Why 2 variants? Previous method wasn't public. Why does this need to be?
   



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {

Review Comment:
   Also document this is nullable (or better, pass a scala Option instead)



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+              forceRestoreTasks.add(taskName.getTaskName());
+            } else {
+              // log and rethrow exception to communicate restore failure
+              String msg = String.format("init failed for task: %s with 
GetDeleted set to true", taskName);

Review Comment:
   Failed for restore manager, not for task.
   



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,8 +263,10 @@ static CompletableFuture<Void> restoreStores(String 
jobName, String jobId, TaskN
         throw new SamzaException(String.format("Error deleting store 
directory: %s", storeDir), e);
       }
 
+      // If getDeletedBlobs is enabled - always restore so that we get all the 
blobs, including the deleted blobs,
+      // immediately restore it locally and backup to create new checkpoint.
       boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, 
dirIndex,
-          storeCheckpointDir, storageConfig, dirDiffUtil);
+          storeCheckpointDir, storageConfig, dirDiffUtil) || getDeletedBlobs;

Review Comment:
   Move getDeletedBlob check to shouldRestore and update javadoc.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -114,6 +114,11 @@ public BlobStoreUtil(BlobStoreManager blobStoreManager, 
ExecutorService executor
    */
   public Map<String, Pair<String, SnapshotIndex>> getStoreSnapshotIndexes(
       String jobName, String jobId, String taskName, Checkpoint checkpoint, 
Set<String> storesToBackupOrRestore) {
+    return getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, 
storesToBackupOrRestore, false);
+  }
+
+  public Map<String, Pair<String, SnapshotIndex>> 
getStoreSnapshotIndexes(String jobName, String jobId, String taskName,

Review Comment:
   Same as other comments re: variants.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {

Review Comment:
   Same as other comments re: variants.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -183,11 +180,20 @@ public Map<String, Pair<String, SnapshotIndex>> 
getStoreSnapshotIndexes(
    * @return a Future containing the {@link SnapshotIndex}
    */
   public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, 
Metadata metadata) {
+    return getSnapshotIndex(blobId, metadata, false);
+  }
+
+  /**
+   * GETs the {@link SnapshotIndex} from the blob store.
+   * @param blobId blob ID of the {@link SnapshotIndex} to get
+   * @return a Future containing the {@link SnapshotIndex}
+   */
+  public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, 
Metadata metadata, Boolean getDeletedBlob) {

Review Comment:
   Same as other comments re: variants.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId, 
getSnapshotRequest, getDeletedBlob).join();
+    return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex, 
requestMetadata);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param snapshotIndex SnapshotIndex to delete
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+    DirIndex dirIndex = snapshotIndex.getDirIndex();
+    CompletionStage<Void> storeDeletionFuture =
+        cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs 
previously marked for removal
+            .thenComposeAsync(v ->
+                deleteDir(dirIndex, requestMetadata), executor) // deleted 
files and dirs still present
+            .thenComposeAsync(v -> 
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) // 
delete the snapshot index blob
+            .exceptionally(ex -> {
+              if (ex instanceof DeletedException) {

Review Comment:
   Document error handling behavior (explain what it should do / is trying to 
do).
   
   Do you need to unwrap CompletedException here?



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId, 
getSnapshotRequest, getDeletedBlob).join();
+    return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex, 
requestMetadata);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param snapshotIndex SnapshotIndex to delete
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+    DirIndex dirIndex = snapshotIndex.getDirIndex();
+    CompletionStage<Void> storeDeletionFuture =
+        cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs 
previously marked for removal
+            .thenComposeAsync(v ->
+                deleteDir(dirIndex, requestMetadata), executor) // deleted 
files and dirs still present
+            .thenComposeAsync(v -> 
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) // 
delete the snapshot index blob
+            .exceptionally(ex -> {
+              if (ex instanceof DeletedException) {
+                LOG.warn("DeletedException received on trying to clean up 
SnapshotIndex {}. Ignoring the error.",
+                    snapshotIndexBlobId);
+                return null;
+              }
+              String msg = String.format("Error deleting/cleaning up 
SnapshotIndex: %s", snapshotIndexBlobId);

Review Comment:
   Same as above, add more context.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {

Review Comment:
   Minor: boolean.
   



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set

Review Comment:
   Minor: document behavior, not impl. E.g. "determines whether to attempt to 
get deleted snapshot index blobs or not." or something.



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {
     initCaughtUpMapping()
 
+    val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
     if (commitManager != null) {
       debug("Starting commit manager for taskName: %s" format taskName)
-
-      commitManager.init()
+      commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)

Review Comment:
   Document what this param is and its relationship to startpoints in 
CommitManager interface.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId, 
getSnapshotRequest, getDeletedBlob).join();
+    return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex, 
requestMetadata);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex

Review Comment:
   Minor: Blob id (typo)



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId, 
getSnapshotRequest, getDeletedBlob).join();
+    return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex, 
requestMetadata);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param snapshotIndex SnapshotIndex to delete
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+    DirIndex dirIndex = snapshotIndex.getDirIndex();
+    CompletionStage<Void> storeDeletionFuture =
+        cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs 
previously marked for removal
+            .thenComposeAsync(v ->
+                deleteDir(dirIndex, requestMetadata), executor) // deleted 
files and dirs still present
+            .thenComposeAsync(v -> 
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) // 
delete the snapshot index blob
+            .exceptionally(ex -> {
+              if (ex instanceof DeletedException) {
+                LOG.warn("DeletedException received on trying to clean up 
SnapshotIndex {}. Ignoring the error.",

Review Comment:
   Any more information we need to log, e.g. storename / taskname etc?



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -397,10 +462,11 @@ public CompletionStage<Void> cleanUpDir(DirIndex 
dirIndex, Metadata metadata) {
    * @param fileBlobs List of {@link FileBlob}s that constitute this file.
    * @param fileToRestore File pointing to the local path where the file will 
be restored.
    * @param requestMetadata {@link Metadata} associated with this request
+   * @param getDeletedFiles Flag that indicates whether to try to get Deleted 
(but not yet compacted) files.
    * @return a future that completes when the file is downloaded and written 
or if an exception occurs.
    */
   @VisibleForTesting
-  CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File 
fileToRestore, Metadata requestMetadata) {
+  CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File 
fileToRestore, Metadata requestMetadata, Boolean getDeletedFiles) {

Review Comment:
   Minor: boolean



##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -139,6 +139,17 @@ public static <K, V> CompletableFuture<Map<K, V>> 
toFutureOfMap(
     });
   }
 
+  /**
+   * Helper method to convert: {@code Map<K, CompletionStage<Void>>}
+   * to:                       {@code CompletableFuture<Void>}
+   *
+   * Returns a future that completes when all value futures complete.
+   * Returned future completes exceptionally if any of the value futures 
complete exceptionally.
+   */
+  public static CompletableFuture<Void> mapToFuture(Map<String, 
CompletionStage<Void>> map) {

Review Comment:
   Does this need to be different from method above?



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {
     initCaughtUpMapping()
 
+    val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
     if (commitManager != null) {
       debug("Starting commit manager for taskName: %s" format taskName)
-
-      commitManager.init()
+      commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)
     } else {
       debug("Skipping commit manager initialization for taskName: %s" format 
taskName)
     }
 
-    if (offsetManager != null) {
-      val checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
-      // Only required for checkpointV2
-      if (checkpoint != null && checkpoint.getVersion == 2) {
-        val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
-        // call cleanUp on backup managers in case the container previously 
failed during commit
-        // before completing this step
-
-        // WARNING: cleanUp is NOT optional with blob stores since this is 
where we reset the TTL for
-        // tracked blobs. if this TTL reset is skipped, some of the blobs 
retained by future commits may
-        // be deleted in the background by the blob store, leading to data 
loss.
-        info("Cleaning up stale state from previous run for taskName: %s" 
format taskName)
-        commitManager.cleanUp(checkpointV2.getCheckpointId, 
checkpointV2.getStateCheckpointMarkers)
-      }
+    var checkpoint: Checkpoint = lastTaskCheckpoint
+    if (offsetManager != null && isStandByTask) {

Review Comment:
   Document _why_ special handling for standby.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel, 
ExecutorService restoreExecu
 
   @Override
   public void init(Checkpoint checkpoint) {
+    init(checkpoint, false);
+  }
+
+  public void init(Checkpoint checkpoint, Boolean getDeletedBlob) {

Review Comment:
   Add javadoc to public method.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,8 +263,10 @@ static CompletableFuture<Void> restoreStores(String 
jobName, String jobId, TaskN
         throw new SamzaException(String.format("Error deleting store 
directory: %s", storeDir), e);
       }
 
+      // If getDeletedBlobs is enabled - always restore so that we get all the 
blobs, including the deleted blobs,
+      // immediately restore it locally and backup to create new checkpoint.
       boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, 
dirIndex,
-          storeCheckpointDir, storageConfig, dirDiffUtil);
+          storeCheckpointDir, storageConfig, dirDiffUtil) || getDeletedBlobs;

Review Comment:
   Move the getDeletedBlob check to beginning to short circuit the rest.
   
   Are you sure this should be short circuiting / overriding all the other 
checks in shouldRestore? Not obvious to me. Any scenarios where shouldRestore 
returns false but we want to force restore?



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {

Review Comment:
   See comment above.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(

Review Comment:
   Add Javadoc. Also document behavior w.r.t. deleted blobs.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);

Review Comment:
   Can you confirm init is safe to be called twice? Also document that this can 
happen in BSRM init javadoc.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -208,9 +205,9 @@ public ContainerStorageManager(
   }
 
 
-  public void start() throws SamzaException, InterruptedException {
+  public Map<TaskName, Checkpoint> start() throws SamzaException, 
InterruptedException {

Review Comment:
   Same for later methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to