This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 1373db238 [SAMZA-2787] GetDeleted API and Recover from 
DeletedException (#1676)
1373db238 is described below

commit 1373db238683179123a15b246272cb0955f9b37c
Author: shekhars-li <[email protected]>
AuthorDate: Wed Aug 9 18:42:46 2023 -0700

    [SAMZA-2787] GetDeleted API and Recover from DeletedException (#1676)
    
    * GetDeleted API and Recover from DeletedException in commit for 
BlobStoreBackendFactory
    
    * Fix style issues, fix breaking test cases
    
    * Fixed test failure
    
    * Fix failing integration test - move storeConsumer start after init() in 
ContainerStorageManager
    
    * Style check fix
    
    * Bug fixes, integration test
    
    * Delete all blob from deleted SnapshotIndex
    
    * clean up for final review
    
    * Add integ test - delete snapshotindex and recover
    
    * Fix failing integration tests related to init()
    
    * Cleanup - remove unused code and move some code to util methods
    
    * Unit test, minor refractoring
    
    * Review comments - 1st round
    
    * Refractor code
    Remove taskcheckpoints mutation
    
    * Address review comment
    
    * Address review comment part 2
    
    * Fix failing CSM test
    
    * Addressed final review comments - more logs, consistent naming
    
    * Checkstyle fix for test class
    
    ---------
    
    Co-authored-by: Shekhar Sharma <[email protected]>
    Co-authored-by: Shekhar Sharma <[email protected]>
---
 .../samza/storage/blobstore/BlobStoreManager.java  |   3 +-
 .../samza/storage/TaskStorageCommitManager.java    |  19 +-
 .../storage/blobstore/BlobStoreBackupManager.java  |  16 +-
 .../storage/blobstore/BlobStoreRestoreManager.java |  62 ++--
 .../storage/blobstore/util/BlobStoreUtil.java      | 151 ++++++---
 .../apache/samza/container/SamzaContainer.scala    |  32 +-
 .../org/apache/samza/container/TaskInstance.scala  |  50 +--
 .../samza/storage/ContainerStorageManager.java     | 110 ++-----
 .../ContainerStorageManagerRestoreUtil.java        | 361 +++++++++++++++++++++
 .../samza/storage/ContainerStorageManagerUtil.java |   3 +-
 .../storage/TestTaskStorageCommitManager.java      |  24 +-
 .../blobstore/TestBlobStoreBackupManager.java      |  16 +-
 .../blobstore/TestBlobStoreRestoreManager.java     |  31 +-
 .../storage/blobstore/util/TestBlobStoreUtil.java  |  93 +++---
 .../samza/container/TestSamzaContainer.scala       |   5 +-
 .../apache/samza/container/TestTaskInstance.scala  |   6 +-
 .../samza/storage/TestContainerStorageManager.java | 200 +++++++++++-
 .../TestTransactionalStateTaskBackupManager.java   |   2 +-
 .../samza/storage/MyStatefulApplication.java       |  25 +-
 .../kv/BlobStoreStateBackendIntegrationTest.java   | 182 +++++++++++
 .../samza/test/util/TestBlobStoreManager.java      |  41 ++-
 21 files changed, 1138 insertions(+), 294 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
 
b/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
index ef3f0c696..affa1f3d4 100644
--- 
a/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
+++ 
b/samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
@@ -49,11 +49,12 @@ public interface BlobStoreManager {
    * @param id Blob ID of the blob to get
    * @param outputStream OutputStream to write the downloaded blob
    * @param metadata User supplied {@link Metadata} of the request
+   * @param getDeletedBlob Flag to indicate if get should try to get a blob 
marked for deletion but not yet compacted
    * @return A future that completes when all the chunks are downloaded and 
written successfully to the OutputStream
    * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException 
returned future should complete
    *         exceptionally with DeletedException on failure with the blob 
already deleted error.
    */
-  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata 
metadata);
+  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata 
metadata, boolean getDeletedBlob);
 
   /**
    * Non-blocking call to mark a blob for deletion in the remote blob store
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 59b010172..5ed4f1e0b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -93,18 +93,21 @@ public class TaskStorageCommitManager {
     this.metrics = metrics;
   }
 
-  public void init() {
+  public void init(Checkpoint checkpoint) {
     // Assuming that container storage manager has already started and created 
to stores
     storageEngines = containerStorageManager.getAllStores(taskName);
-    if (checkpointManager != null) {
-      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
-      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, 
checkpoint);
-      stateBackendToBackupManager.values()
-          .forEach(storageBackupManager -> 
storageBackupManager.init(checkpoint));
+    final Checkpoint latestCheckpoint;
+    if (checkpoint == null && checkpointManager != null) {
+      latestCheckpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, 
latestCheckpoint);
     } else {
-      stateBackendToBackupManager.values()
-          .forEach(storageBackupManager -> storageBackupManager.init(null));
+      latestCheckpoint = checkpoint;
     }
+    stateBackendToBackupManager.values()
+        .forEach(storageBackupManager -> {
+          LOG.debug("Init for storageBackupManager {} with checkpoint {}", 
storageBackupManager, latestCheckpoint);
+          storageBackupManager.init(latestCheckpoint);
+        });
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index cb7ffad8a..5f87de95b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -133,14 +133,17 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
   @Override
   public void init(Checkpoint checkpoint) {
     long startTime = System.nanoTime();
-    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+    LOG.debug("Initializing blob store backup manager for task: {} with 
checkpoint {}", taskName, checkpoint);
 
     blobStoreManager.init();
 
     // Note: blocks the caller thread.
     // TODO LOW shesharma exclude stores that are no longer configured during 
init
+    // NOTE: Get SnapshotIndex with getDeleted set to false. A failure to get 
a blob from SnapshotIndex would restart
+    // the container and the init()/restore() should be able to create a new 
Snapshot in the blob store and recover.
+    // This helps with the rare race condition where SnapshotIndex was deleted 
after the restore completed successfully.
     Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
-        blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, 
checkpoint, new HashSet<>(storesToBackup));
+        blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, 
checkpoint, new HashSet<>(storesToBackup), false);
     this.prevStoreSnapshotIndexesFuture =
         
CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
     metrics.initNs.set(System.nanoTime() - startTime);
@@ -288,7 +291,7 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
         Metadata requestMetadata =
             new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, 
Optional.empty(), jobName, jobId, taskName, storeName);
         CompletionStage<SnapshotIndex> snapshotIndexFuture =
-            blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId, 
requestMetadata);
+            blobStoreUtil.getSnapshotIndex(snapshotIndexBlobId, 
requestMetadata, false);
 
         // 1. remove TTL of index blob and all of its files and sub-dirs 
marked for retention
         CompletionStage<Void> removeTTLFuture =
@@ -327,7 +330,12 @@ public class BlobStoreBackupManager implements 
TaskBackupManager {
     });
 
     return FutureUtil.allOf(removeTTLFutures, cleanupRemoteSnapshotFutures, 
removePrevRemoteSnapshotFutures)
-        .whenComplete((res, ex) -> metrics.cleanupNs.update(System.nanoTime() 
- startTime));
+        .whenComplete((res, ex) -> {
+          if (ex != null) {
+            LOG.error("Could not finish cleanup. checkpointid: {}, store SCMs: 
{}", checkpointId, storeSCMs, ex);
+          }
+          metrics.cleanupNs.update(System.nanoTime() - startTime);
+        });
   }
 
   @Override
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index 5010fd0c6..c370c62df 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -119,16 +119,30 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
 
   @Override
   public void init(Checkpoint checkpoint) {
+    // By default, init without retrying deleted SnapshotIndex blob. We want 
the init to fail if the SnapshotIndex blob
+    // was deleted. This allows us to mark the task for a full restore in 
restore().
+    init(checkpoint, false);
+  }
+
+  /**
+   * Initialize state resources such as store directories.
+   * NOTE: init can be called twice. In case init fails with DeletedException 
for the first time, it will be retried with
+   *       getDeleted set to true.
+   * @param checkpoint Current task checkpoint
+   * @param getDeleted Flag to get deleted SnapshotIndex blob
+   */
+  public void init(Checkpoint checkpoint, boolean getDeleted) {
     long startTime = System.nanoTime();
-    LOG.debug("Initializing blob store restore manager for task: {}", 
taskName);
+    LOG.debug("Initializing blob store restore manager for task {} with 
getDeleted {}", taskName, getDeleted);
 
     blobStoreManager.init();
 
     // get previous SCMs from checkpoint
-    prevStoreSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes(jobName, 
jobId, taskName, checkpoint, storesToRestore);
+    prevStoreSnapshotIndexes =
+        blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, 
checkpoint, storesToRestore, getDeleted);
     metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
-    LOG.trace("Found previous snapshot index during blob store restore manager 
init for task: {} to be: {}",
-        taskName, prevStoreSnapshotIndexes);
+    LOG.trace("Found previous snapshot index during blob store restore manager 
init for task: {} to be: {} with getDeleted set to {}",
+        taskName, prevStoreSnapshotIndexes, getDeleted);
 
     metrics.initStoreMetrics(storesToRestore);
 
@@ -150,7 +164,17 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
   @Override
   public CompletableFuture<Void> restore() {
     return restoreStores(jobName, jobId, taskModel.getTaskName(), 
storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
-        storageConfig, metrics, storageManagerUtil, blobStoreUtil, 
dirDiffUtil, executor);
+        storageConfig, metrics, storageManagerUtil, blobStoreUtil, 
dirDiffUtil, executor, false);
+  }
+
+  /**
+   * Restore state from checkpoints and state snapshots.
+   * @param restoreDeleted This flag forces the restore to always download the 
state from blob store with the get deleted
+   *                       flag enabled.
+   */
+  public CompletableFuture<Void> restore(boolean restoreDeleted) {
+    return restoreStores(jobName, jobId, taskModel.getTaskName(), 
storesToRestore, prevStoreSnapshotIndexes,
+        loggedBaseDir, storageConfig, metrics, storageManagerUtil, 
blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
   }
 
   @Override
@@ -186,17 +210,9 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
       if (!storesToBackup.contains(storeName) && 
!storesToRestore.contains(storeName)) {
         LOG.info("Removing task: {} store: {} from blob store. It is either no 
longer used, " +
             "or is no longer configured to be backed up or restored with blob 
store.", taskName, storeName);
-        DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
         Metadata requestMetadata =
             new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, 
Optional.empty(), jobName, jobId, taskName, storeName);
-        CompletionStage<Void> storeDeletionFuture =
-            blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete 
files and sub-dirs previously marked for removal
-                .thenComposeAsync(v ->
-                    blobStoreUtil.deleteDir(dirIndex, requestMetadata), 
executor) // deleted files and dirs still present
-                .thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob(
-                    scmAndSnapshotIndex.getLeft(), requestMetadata),
-                    executor); // delete the snapshot index blob
-        storeDeletionFutures.add(storeDeletionFuture);
+        
storeDeletionFutures.add(blobStoreUtil.cleanSnapshotIndex(scmAndSnapshotIndex.getLeft(),
 scmAndSnapshotIndex.getRight(), requestMetadata));
       }
     });
 
@@ -211,10 +227,9 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
       Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
       File loggedBaseDir, StorageConfig storageConfig, 
BlobStoreRestoreManagerMetrics metrics,
       StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, 
DirDiffUtil dirDiffUtil,
-      ExecutorService executor) {
+      ExecutorService executor, boolean getDeleted) {
     long restoreStartTime = System.nanoTime();
     List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
-
     LOG.debug("Starting restore for task: {} stores: {}", taskName, 
storesToRestore);
     storesToRestore.forEach(storeName -> {
       if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
@@ -254,7 +269,12 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
         throw new SamzaException(String.format("Error deleting store 
directory: %s", storeDir), e);
       }
 
-      boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, 
dirIndex,
+      // Restore from blob store if:
+      // 1. shouldRestore() returns true - there is a diff between local and 
remote snapshot.
+      // 2. getDeleted is set - Some blobs in the blob store were deleted 
incorrectly (SAMZA-2787). Download/restore
+      //                             everything locally ignoring the diff. 
This will be backed up afresh by
+      //                             ContainerStorageManager recovery path.
+      boolean shouldRestore = getDeleted || 
shouldRestore(taskName.getTaskName(), storeName, dirIndex,
           storeCheckpointDir, storageConfig, dirDiffUtil);
 
       if (shouldRestore) { // restore the store from the remote blob store
@@ -268,7 +288,7 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
 
         metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - 
storeRestoreStartTime);
         enqueueRestore(jobName, jobId, taskName.toString(), storeName, 
storeDir, dirIndex, storeRestoreStartTime,
-            restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor);
+            restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, 
getDeleted);
       } else {
         LOG.debug("Renaming store checkpoint directory: {} to store directory: 
{} since its contents are identical " +
             "to the remote snapshot.", storeCheckpointDir, storeDir);
@@ -329,11 +349,11 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
   @VisibleForTesting
   static void enqueueRestore(String jobName, String jobId, String taskName, 
String storeName, File storeDir, DirIndex dirIndex,
       long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures, 
BlobStoreUtil blobStoreUtil,
-      DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, 
ExecutorService executor) {
+      DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, 
ExecutorService executor, boolean getDeleted) {
 
     Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), 
Optional.empty(), jobName, jobId, taskName, storeName);
     CompletableFuture<Void> restoreFuture =
-        blobStoreUtil.restoreDir(storeDir, dirIndex, 
requestMetadata).thenRunAsync(() -> {
+        blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata, 
getDeleted).thenRunAsync(() -> {
           metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - 
storeRestoreStartTime);
 
           long postRestoreStartTime = System.nanoTime();
@@ -345,7 +365,7 @@ public class BlobStoreRestoreManager implements 
TaskRestoreManager {
                     storeDir.getAbsolutePath()));
           } else {
             metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - 
postRestoreStartTime);
-            LOG.info("Restore from remote snapshot completed for store: {}", 
storeDir);
+            LOG.info("Restore from remote snapshot completed for store: {} 
with getDeleted set to {}", storeDir, getDeleted);
           }
         }, executor);
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index fb7104ed1..327eb7f33 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -110,10 +110,11 @@ public class BlobStoreUtil {
    * @param checkpoint {@link Checkpoint} instance to get the store state 
checkpoint markers from. Only
    *                   {@link CheckpointV2} and newer are supported for blob 
stores.
    * @param storesToBackupOrRestore set of store names to be backed up or 
restored
+   * @param getDeleted tries gets a deleted but not yet compacted 
SnapshotIndex from the blob store.
    * @return Map of store name to its blob id of snapshot indices and their 
corresponding snapshot indices for the task.
    */
-  public Map<String, Pair<String, SnapshotIndex>> getStoreSnapshotIndexes(
-      String jobName, String jobId, String taskName, Checkpoint checkpoint, 
Set<String> storesToBackupOrRestore) {
+  public Map<String, Pair<String, SnapshotIndex>> 
getStoreSnapshotIndexes(String jobName, String jobId, String taskName,
+      Checkpoint checkpoint, Set<String> storesToBackupOrRestore, boolean 
getDeleted) {
     //TODO MED shesharma document error handling (checkpoint ver, blob not 
found, getBlob)
     if (checkpoint == null) {
       LOG.debug("No previous checkpoint found for taskName: {}", taskName);
@@ -136,11 +137,12 @@ public class BlobStoreUtil {
       storeSnapshotIndexBlobIds.forEach((storeName, snapshotIndexBlobId) -> {
         if (storesToBackupOrRestore.contains(storeName)) {
           try {
-            LOG.debug("Getting snapshot index for taskName: {} store: {} 
blobId: {}", taskName, storeName, snapshotIndexBlobId);
+            LOG.debug("Getting snapshot index for taskName: {} store: {} 
blobId: {} with getDeleted set to {}",
+                taskName, storeName, snapshotIndexBlobId, getDeleted);
             Metadata requestMetadata =
                 new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, 
Optional.empty(), jobName, jobId, taskName, storeName);
             CompletableFuture<SnapshotIndex> snapshotIndexFuture =
-                getSnapshotIndex(snapshotIndexBlobId, 
requestMetadata).toCompletableFuture();
+                getSnapshotIndex(snapshotIndexBlobId, requestMetadata, 
getDeleted).toCompletableFuture();
             Pair<CompletableFuture<String>, CompletableFuture<SnapshotIndex>> 
pairOfFutures =
                 
Pair.of(CompletableFuture.completedFuture(snapshotIndexBlobId), 
snapshotIndexFuture);
 
@@ -162,15 +164,7 @@ public class BlobStoreUtil {
     }
 
     try {
-      return FutureUtil.toFutureOfMap(t -> {
-        Throwable unwrappedException = 
FutureUtil.unwrapExceptions(CompletionException.class, t);
-        if (unwrappedException instanceof DeletedException) {
-          LOG.warn("Ignoring already deleted snapshot index for taskName: {}", 
taskName, t);
-          return true;
-        } else {
-          return false;
-        }
-      }, storeSnapshotIndexFutures).join();
+      return FutureUtil.toFutureOfMap(storeSnapshotIndexFutures).join();
     } catch (Exception e) {
       throw new SamzaException(
           String.format("Error while waiting to get store snapshot indexes for 
task %s", taskName), e);
@@ -182,12 +176,12 @@ public class BlobStoreUtil {
    * @param blobId blob ID of the {@link SnapshotIndex} to get
    * @return a Future containing the {@link SnapshotIndex}
    */
-  public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, 
Metadata metadata) {
+  public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, 
Metadata metadata, boolean getDeleted) {
     Preconditions.checkState(StringUtils.isNotBlank(blobId));
     String opName = "getSnapshotIndex: " + blobId;
     return FutureUtil.executeAsyncWithRetries(opName, () -> {
       ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // 
no need to close ByteArrayOutputStream
-      return blobStoreManager.get(blobId, indexBlobStream, 
metadata).toCompletableFuture()
+      return blobStoreManager.get(blobId, indexBlobStream, metadata, 
getDeleted).toCompletableFuture()
           .thenApplyAsync(f -> 
snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor);
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
@@ -210,6 +204,51 @@ public class BlobStoreUtil {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Gets SnapshotIndex blob, cleans up a SnapshotIndex by recursively 
deleting all blobs associated with files/subdirs
+   * inside the SnapshotIndex and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob id of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeleted Determines whether to try to get deleted SnapshotIndex 
or not.
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, boolean getDeleted) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    return getSnapshotIndex(snapshotIndexBlobId, getSnapshotRequest, 
getDeleted)
+        .thenCompose(snapshotIndex -> cleanSnapshotIndex(snapshotIndexBlobId, 
snapshotIndex, requestMetadata));
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param snapshotIndex SnapshotIndex to delete
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+    DirIndex dirIndex = snapshotIndex.getDirIndex();
+    CompletionStage<Void> storeDeletionFuture =
+        cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs 
previously marked for removal
+            .thenComposeAsync(v ->
+                deleteDir(dirIndex, requestMetadata), executor) // deleted 
files and dirs still present
+            .thenComposeAsync(v -> 
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) // 
delete the snapshot index blob
+            .exceptionally(ex -> {
+              Throwable unwrappedException = 
FutureUtil.unwrapExceptions(CompletionException.class,
+                  FutureUtil.unwrapExceptions(SamzaException.class, ex));
+              // If a blob is already deleted, do not fail -> this may happen 
if after we restore a
+              // deleted checkpoint and then try to clean up old checkpoint.
+              if (unwrappedException instanceof DeletedException) {
+                LOG.warn("Request {} received DeletedException on trying to 
clean up SnapshotIndex {}. Ignoring the error.",
+                    requestMetadata, snapshotIndexBlobId);
+                return null;
+              }
+              String msg = String.format("Request %s received error 
deleting/cleaning up SnapshotIndex: %s",
+                  requestMetadata, snapshotIndexBlobId);
+              throw new SamzaException(msg, ex);
+            });
+    return storeDeletionFuture;
+  }
+
   /**
    * WARNING: This method deletes the **SnapshotIndex blob** from the 
snapshot. This should only be called to clean
    * up an older snapshot **AFTER** all the files and sub-dirs to be deleted 
from this snapshot are already deleted
@@ -229,10 +268,11 @@ public class BlobStoreUtil {
   /**
    * Non-blocking restore of a {@link SnapshotIndex} to local store by 
downloading all the files and sub-dirs associated
    * with this remote snapshot.
+   * NOTE: getDeleted flag sets if it reattempts to get a deleted file by 
setting getDeleted flag in getFiles.
    * @return A future that completes when all the async downloads completes
    */
-  public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex, 
Metadata metadata) {
-    LOG.debug("Restoring contents of directory: {} from remote snapshot.", 
baseDir);
+  public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex, 
Metadata metadata, boolean getDeleted) {
+    LOG.debug("Restoring contents of directory: {} from remote snapshot. 
GetDeletedFiles set to: {}", baseDir, getDeleted);
 
     List<CompletableFuture<Void>> downloadFutures = new ArrayList<>();
 
@@ -255,7 +295,7 @@ public class BlobStoreUtil {
 
       String opName = "restoreFile: " + fileToRestore.getAbsolutePath();
       CompletableFuture<Void> fileRestoreFuture =
-          FutureUtil.executeAsyncWithRetries(opName, () -> getFile(fileBlobs, 
fileToRestore, requestMetadata),
+          FutureUtil.executeAsyncWithRetries(opName, () -> getFile(fileBlobs, 
fileToRestore, requestMetadata, getDeleted),
               isCauseNonRetriable(), executor, retryPolicyConfig);
       downloadFutures.add(fileRestoreFuture);
     }
@@ -264,7 +304,7 @@ public class BlobStoreUtil {
     List<DirIndex> subDirs = dirIndex.getSubDirsPresent();
     for (DirIndex subDir : subDirs) {
       File subDirFile = Paths.get(baseDir.getAbsolutePath(), 
subDir.getDirName()).toFile();
-      downloadFutures.add(restoreDir(subDirFile, subDir, metadata));
+      downloadFutures.add(restoreDir(subDirFile, subDir, metadata, 
getDeleted));
     }
 
     return FutureUtil.allOf(downloadFutures);
@@ -397,10 +437,11 @@ public class BlobStoreUtil {
    * @param fileBlobs List of {@link FileBlob}s that constitute this file.
    * @param fileToRestore File pointing to the local path where the file will 
be restored.
    * @param requestMetadata {@link Metadata} associated with this request
+   * @param getDeleted Flag that indicates whether to try to get Deleted (but 
not yet compacted) files.
    * @return a future that completes when the file is downloaded and written 
or if an exception occurs.
    */
   @VisibleForTesting
-  CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File 
fileToRestore, Metadata requestMetadata) {
+  CompletableFuture<Void> getFile(List<FileBlob> fileBlobs, File 
fileToRestore, Metadata requestMetadata, boolean getDeleted) {
     FileOutputStream outputStream = null;
     try {
       long restoreFileStartTime = System.nanoTime();
@@ -422,9 +463,9 @@ public class BlobStoreUtil {
       CompletableFuture<Void> resultFuture = 
CompletableFuture.completedFuture(null);
       for (FileBlob fileBlob : fileBlobsCopy) {
         resultFuture = resultFuture.thenComposeAsync(v -> {
-          LOG.debug("Starting restore for file: {} with blob id: {} at offset: 
{}", fileToRestore, fileBlob.getBlobId(),
-              fileBlob.getOffset());
-          return blobStoreManager.get(fileBlob.getBlobId(), finalOutputStream, 
requestMetadata);
+          LOG.debug("Starting restore for file: {} with blob id: {} at offset: 
{} with getDeleted set to: {}",
+              fileToRestore, fileBlob.getBlobId(), fileBlob.getOffset(), 
getDeleted);
+          return blobStoreManager.get(fileBlob.getBlobId(), finalOutputStream, 
requestMetadata, getDeleted);
         }, executor);
       }
 
@@ -565,6 +606,44 @@ public class BlobStoreUtil {
     return CompletableFuture.allOf(deleteFutures.toArray(new 
CompletableFuture[0]));
   }
 
+  /**
+   * Get the {@link SnapshotIndex} using the blob id and marks all the blobs 
associated with it to never expire,
+   * including the SnapshotIndex itself.
+   * @param indexBlobId Blob id of {@link SnapshotIndex}
+   * @param metadata {@link Metadata} related to the request
+   * @return A future that completes when all the files and subdirs associated 
with this remote snapshot, as well as
+   * the {@link SnapshotIndex} associated with the snapshot are marked to 
never expire.
+   */
+  public CompletableFuture<Void> removeTTLForSnapshotIndex(String indexBlobId, 
Metadata metadata) {
+    return getSnapshotIndex(indexBlobId, metadata, false)
+        .thenCompose(snapshotIndex -> removeTTL(indexBlobId, snapshotIndex, 
metadata));
+  }
+
+  /**
+   * Marks all the blobs associated with an {@link SnapshotIndex} to never 
expire, including the SnapshotIndex
+   * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot
+   * @param metadata {@link Metadata} related to the request
+   * @return A future that completes when all the files and subdirs associated 
with this remote snapshot are marked to
+   * never expire.
+   */
+  public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex 
snapshotIndex, Metadata metadata) {
+    SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata();
+    LOG.debug("Marking contents of SnapshotIndex: {} to never expire", 
snapshotMetadata.toString());
+
+    String opName = "removeTTL for SnapshotIndex for checkpointId: " + 
snapshotMetadata.getCheckpointId();
+    Supplier<CompletionStage<Void>> removeDirIndexTTLAction =
+      () -> removeTTL(snapshotIndex.getDirIndex(), 
metadata).toCompletableFuture();
+    CompletableFuture<Void> dirIndexTTLRemovalFuture =
+        FutureUtil.executeAsyncWithRetries(opName, removeDirIndexTTLAction, 
isCauseNonRetriable(), executor, retryPolicyConfig);
+
+    return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> {
+      String op2Name = "removeTTL for indexBlobId: " + indexBlobId;
+      Supplier<CompletionStage<Void>> removeIndexBlobTTLAction =
+        () -> blobStoreManager.removeTTL(indexBlobId, 
metadata).toCompletableFuture();
+      return FutureUtil.executeAsyncWithRetries(op2Name, 
removeIndexBlobTTLAction, isCauseNonRetriable(), executor, retryPolicyConfig);
+    }, executor);
+  }
+
   /**
    * Recursively mark all the blobs associated with the {@link DirIndex} to 
never expire (remove TTL).
    * @param dirIndex the {@link DirIndex} whose contents' TTL needs to be 
removed
@@ -603,32 +682,6 @@ public class BlobStoreUtil {
     return CompletableFuture.allOf(updateTTLsFuture.toArray(new 
CompletableFuture[0]));
   }
 
-
-  /**
-   * Marks all the blobs associated with an {@link SnapshotIndex} to never 
expire.
-   * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot
-   * @param metadata {@link Metadata} related to the request
-   * @return A future that completes when all the files and subdirs associated 
with this remote snapshot are marked to
-   * never expire.
-   */
-  public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex 
snapshotIndex, Metadata metadata) {
-    SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata();
-    LOG.debug("Marking contents of SnapshotIndex: {} to never expire", 
snapshotMetadata.toString());
-
-    String opName = "removeTTL for SnapshotIndex for checkpointId: " + 
snapshotMetadata.getCheckpointId();
-    Supplier<CompletionStage<Void>> removeDirIndexTTLAction =
-      () -> removeTTL(snapshotIndex.getDirIndex(), 
metadata).toCompletableFuture();
-    CompletableFuture<Void> dirIndexTTLRemovalFuture =
-        FutureUtil.executeAsyncWithRetries(opName, removeDirIndexTTLAction, 
isCauseNonRetriable(), executor, retryPolicyConfig);
-
-    return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> {
-      String op2Name = "removeTTL for indexBlobId: " + indexBlobId;
-      Supplier<CompletionStage<Void>> removeIndexBlobTTLAction =
-        () -> blobStoreManager.removeTTL(indexBlobId, 
metadata).toCompletableFuture();
-      return FutureUtil.executeAsyncWithRetries(op2Name, 
removeIndexBlobTTLAction, isCauseNonRetriable(), executor, retryPolicyConfig);
-    }, executor);
-  }
-
   private static Predicate<Throwable> isCauseNonRetriable() {
     return throwable -> {
       Throwable unwrapped = 
FutureUtil.unwrapExceptions(CompletionException.class, throwable);
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index d9cbce974..999bdb7dd 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -30,7 +30,7 @@ import java.util.{Base64, Optional}
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.SamzaException
 import org.apache.samza.application.ApplicationUtil
-import org.apache.samza.checkpoint.{CheckpointListener, OffsetManager, 
OffsetManagerMetrics}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointListener, 
OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.clustermanager.StandbyTaskUtil
 import org.apache.samza.config.{StreamConfig, _}
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
@@ -780,11 +780,11 @@ class SamzaContainer(
       // TODO HIGH pmaheshw SAMZA-2338: since store restore needs to trim 
changelog messages,
       // need to start changelog producers before the stores, but stop them 
after stores.
       startProducers
-      startStores
+      val taskCheckpoints = startStores
       startTableManager
       startDiskSpaceMonitor
       startHostStatisticsMonitor
-      startTask
+      startTask(taskCheckpoints)
       startConsumers
       startSecurityManger
 
@@ -981,7 +981,14 @@ class SamzaContainer(
     }
   }
 
-  def startStores {
+  /**
+   * Starts all the stores by restoring and recreating the stores, if necessary
+   * @return Returns the latest checkpoint associated with each task. This 
checkpoint does not have Startpoint applied
+   *         over it.
+   *         Note: In case of blob store manager, returned checkpoints for a 
task may contain a checkpoint recreated on
+   *         blob store, in case the previous one was recently deleted. More 
details in SAMZA-2787
+   */
+  def startStores: util.Map[TaskName, Checkpoint] = {
     info("Starting container storage manager.")
     containerStorageManager.start()
   }
@@ -993,10 +1000,21 @@ class SamzaContainer(
     })
   }
 
-  def startTask {
-    info("Initializing stream tasks.")
+  /**
+   * Init all task instances
+   * @param taskCheckpoints last checkpoint for a TaskName. This last 
checkpoint could be different from the one returned
+   *                        from CheckpointManager#getLastCheckpoint. The new 
checkpoint could be created in case the last
+   *                        checkpoint was recently deleted and 
BlobStoreManager could recover it. More details in
+   *                        SAMZA-2787
+   */
+  def startTask(taskCheckpoints: util.Map[TaskName, Checkpoint]) {
+    info("Initializing stream tasks with taskCheckpoints %s." 
format(taskCheckpoints) )
 
-    taskInstances.values.foreach(_.initTask)
+    taskInstances.keys.foreach { taskName =>
+      val taskInstance = taskInstances(taskName)
+      val checkpoint = taskCheckpoints.asScala.getOrElse(taskName, null)
+      taskInstance.initTask(Some(checkpoint))
+    }
   }
 
   def startAdmins {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5f35ce21e..285e7c877 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -24,10 +24,10 @@ import java.util.{Collections, Objects, Optional}
 import java.util.concurrent.{CompletableFuture, ExecutorService, 
ScheduledExecutorService, Semaphore, TimeUnit}
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.kafka.{KafkaChangelogSSPOffset, 
KafkaStateCheckpointMarker}
-import org.apache.samza.checkpoint.{CheckpointId, CheckpointV1, CheckpointV2, 
OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointV1, 
CheckpointV2, OffsetManager}
 import org.apache.samza.config.{Config, JobConfig, StreamConfig, TaskConfig}
 import org.apache.samza.context._
-import org.apache.samza.job.model.{JobModel, TaskModel}
+import org.apache.samza.job.model.{JobModel, TaskMode, TaskModel}
 import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, 
ScheduledCallback}
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.{ContainerStorageManager, 
TaskStorageCommitManager}
@@ -149,31 +149,41 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Option[Checkpoint]) {
     initCaughtUpMapping()
 
-    if (commitManager != null) {
-      debug("Starting commit manager for taskName: %s" format taskName)
+    val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+    var checkpoint: Checkpoint = lastTaskCheckpoint.orNull
+    debug("initTask with optional checkpoint %s" format(checkpoint))
 
-      commitManager.init()
+    if (commitManager != null) {
+      debug("Starting commit manager for taskName %s" format taskName)
+      if (isStandByTask) {
+        debug("Passing null to init for standby taskName %s" format taskName)
+        // pass null in case of standby task. This is to ensure, checkpoint is 
always read from checkpoint topic
+        commitManager.init(null)
+      } else {
+        debug("init taskName %s with Checkpoint %s" format(taskName, 
checkpoint))
+        commitManager.init(checkpoint)
+      }
     } else {
       debug("Skipping commit manager initialization for taskName: %s" format 
taskName)
     }
 
-    if (offsetManager != null) {
-      val checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
-      // Only required for checkpointV2
-      if (checkpoint != null && checkpoint.getVersion == 2) {
-        val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
-        // call cleanUp on backup managers in case the container previously 
failed during commit
-        // before completing this step
-
-        // WARNING: cleanUp is NOT optional with blob stores since this is 
where we reset the TTL for
-        // tracked blobs. if this TTL reset is skipped, some of the blobs 
retained by future commits may
-        // be deleted in the background by the blob store, leading to data 
loss.
-        info("Cleaning up stale state from previous run for taskName: %s" 
format taskName)
-        commitManager.cleanUp(checkpointV2.getCheckpointId, 
checkpointV2.getStateCheckpointMarkers)
-      }
+    if (offsetManager != null && isStandByTask) {
+      checkpoint = offsetManager.getLastTaskCheckpoint(taskName)
+    }
+    // Only required for checkpointV2
+    if (checkpoint != null && checkpoint.getVersion == 2) {
+      val checkpointV2 = checkpoint.asInstanceOf[CheckpointV2]
+      // call cleanUp on backup managers in case the container previously 
failed during commit
+      // before completing this step
+
+      // WARNING: cleanUp is NOT optional with blob stores since this is where 
we reset the TTL for
+      // tracked blobs. if this TTL reset is skipped, some of the blobs 
retained by future commits may
+      // be deleted in the background by the blob store, leading to data loss.
+      info("Cleaning up stale state from previous run for taskName: %s" format 
taskName)
+      commitManager.cleanUp(checkpointV2.getCheckpointId, 
checkpointV2.getStateCheckpointMarkers)
     }
 
     if (taskConfig.getTransactionalStateRestoreEnabled() && 
taskConfig.getCommitMs > 0) {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index c9ee065c0..f46a823e3 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -22,17 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
@@ -48,7 +45,6 @@ import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.metrics.Gauge;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeManager;
 import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
@@ -207,10 +203,14 @@ public class ContainerStorageManager {
         new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
   }
 
-
-  public void start() throws SamzaException, InterruptedException {
+  /**
+   * Starts all the task stores.
+   * Returns the latest checkpoint for each task. This checkpoint may be 
different from the lastCheckpoint returned by
+   * checkpoint manager in case of a BlobStoreRestoreManager.
+   */
+  public Map<TaskName, Checkpoint> start() throws SamzaException, 
InterruptedException {
     // Restores and recreates stores.
-    restoreStores();
+    Map<TaskName, Checkpoint> taskCheckpoints = restoreStores();
 
     // Shutdown restore executor since it will no longer be used
     try {
@@ -248,10 +248,11 @@ public class ContainerStorageManager {
     });
 
     isStarted = true;
+    return taskCheckpoints;
   }
 
   // Restoration of all stores, in parallel across tasks
-  private void restoreStores() throws InterruptedException {
+  private Map<TaskName, Checkpoint> restoreStores() throws 
InterruptedException {
     LOG.info("Store Restore started");
     Set<TaskName> activeTasks = 
ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Active).keySet();
     // Find all non-side input stores
@@ -263,6 +264,7 @@ public class ContainerStorageManager {
     // Obtain the checkpoints for each task
     Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new 
HashMap<>();
     Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
+    Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames = 
new HashMap<>();
     containerModel.getTasks().forEach((taskName, taskModel) -> {
       Checkpoint taskCheckpoint = null;
       if (checkpointManager != null && activeTasks.contains(taskName)) {
@@ -301,79 +303,30 @@ public class ContainerStorageManager {
               samzaContainerMetrics, taskInstanceMetrics, 
taskInstanceCollectors, serdes,
               loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, 
clock);
       taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+      taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
     });
 
-    // Initialize each TaskStorageManager
-    taskRestoreManagers.forEach((taskName, restoreManagers) ->
-        restoreManagers.forEach((factoryName, taskRestoreManager) ->
-            taskRestoreManager.init(taskCheckpoints.get(taskName))
-        )
-    );
-
-    // Start each store consumer once.
-    // Note: These consumers are per system and only changelog system store 
consumers will be started.
-    // Some TaskRestoreManagers may not require the consumer to to be started, 
but due to the agnostic nature of
-    // ContainerStorageManager we always start the changelog consumer here in 
case it is required
-    
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
-    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
-    // Submit restore callable for each taskInstance
-    taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
-      // Submit for each restore factory
-      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
-        long startTime = System.currentTimeMillis();
-        String taskName = taskInstance.getTaskName();
-        LOG.info("Starting restore for state for task: {}", taskName);
-        CompletableFuture<Void> restoreFuture = 
taskRestoreManager.restore().handle((res, ex) -> {
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore",
-                taskName, ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be be able to 
continue processing/backups
-            // if restore manager close fails.
-          }
-
-          long timeToRestore = System.currentTimeMillis() - startTime;
-          if (samzaContainerMetrics != null) {
-            Gauge taskGauge = 
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, 
null);
-
-            if (taskGauge != null) {
-              taskGauge.set(timeToRestore);
-            }
-          }
-
-          if (ex != null) {
-            // log and rethrow exception to communicate restore failure
-            String msg = String.format("Error restoring state for task: %s", 
taskName);
-            LOG.error(msg, ex);
-            throw new SamzaException(msg, ex); // wrap in unchecked exception 
to throw from lambda
-          } else {
-            return null;
-          }
-        });
-
-        taskRestoreFutures.add(restoreFuture);
-      });
-    });
+    // Init all taskRestores and if successful, restores all the task stores 
concurrently
+    LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
+    CompletableFuture<Map<TaskName, Checkpoint>> 
initRestoreAndNewCheckpointFuture =
+        
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(taskRestoreManagers,
 samzaContainerMetrics,
+            checkpointManager, jobContext, containerModel, taskCheckpoints, 
taskBackendFactoryToStoreNames, config,
+            restoreExecutor, taskInstanceMetrics, loggedStoreBaseDirectory, 
storeConsumers);
 
-    // Loop-over the future list to wait for each restore to finish, catch any 
exceptions during restore and throw
-    // as samza exceptions
-    for (Future<Void> future : taskRestoreFutures) {
-      try {
-        future.get();
-      } catch (InterruptedException e) {
-        LOG.warn("Received an interrupt during store restoration. Interrupting 
the restore executor to exit "
-            + "prematurely without restoring full state.");
-        restoreExecutor.shutdownNow();
-        throw e;
-      } catch (Exception e) {
-        LOG.error("Exception when restoring state.", e);
-        throw new SamzaException("Exception when restoring state.", e);
-      }
+    // Update the task checkpoints map, if it was updated during the restore. 
Throw an exception if the restore or
+    // creating a new checkpoint (in case of BlobStoreBackendFactory) failed.
+    try {
+      Map<TaskName, Checkpoint> newTaskCheckpoints = 
initRestoreAndNewCheckpointFuture.get();
+      taskCheckpoints.putAll(newTaskCheckpoints);
+      LOG.debug("Post init and restore checkpoints is: {}. NewTaskCheckpoints 
are: {}", taskCheckpoints, newTaskCheckpoints);
+    } catch (InterruptedException e) {
+      LOG.warn("Received an interrupt during store restoration. Interrupting 
the restore executor to exit "
+          + "prematurely without restoring full state.");
+      restoreExecutor.shutdownNow();
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Exception when restoring state.", e);
+      throw new SamzaException("Exception when restoring state.", e);
     }
 
     // Stop each store consumer once
@@ -400,6 +353,7 @@ public class ContainerStorageManager {
     });
 
     LOG.info("Store Restore complete");
+    return taskCheckpoints;
   }
 
   /**
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
new file mode 100644
index 000000000..db0a5be46
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.commons.io.FileUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.config.BlobStoreConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.storage.blobstore.BlobStoreBackupManager;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+import org.apache.samza.storage.blobstore.BlobStoreRestoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.exceptions.DeletedException;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ContainerStorageManagerRestoreUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManagerRestoreUtil.class);
+
+  /**
+   * Inits and Restores all the task stores.
+   * Note: In case of {@link BlobStoreRestoreManager}, this method retries 
init and restore with getDeleted flag if it
+   * receives a {@link DeletedException}. This will create a new checkpoint 
for the corresponding task.
+   */
+  public static CompletableFuture<Map<TaskName, Checkpoint>> 
initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+              forceRestoreTasks.add(taskName.getTaskName());
+            } else {
+              // log and rethrow exception to communicate restore failure
+              String msg = String.format("init failed for 
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+              LOG.error(msg, ex);
+              throw new SamzaException(msg, ex);
+            }
+          }
+        })
+    );
+
+    // Start each store consumer once.
+    // Note: These consumers are per system and only changelog system store 
consumers will be started.
+    // Some TaskRestoreManagers may not require the consumer to be started, 
but due to the agnostic nature of
+    // ContainerStorageManager we always start the changelog consumer here in 
case it is required
+    storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+    return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints, 
taskBackendFactoryToStoreNames, jobContext,
+        containerModel, samzaContainerMetrics, checkpointManager, config, 
taskInstanceMetrics, executor, loggerStoreDir,
+        forceRestoreTasks);
+  }
+
+  /**
+   * Restores all TaskInstances and returns a future for each TaskInstance 
restore. Note: In case of
+   * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag 
if it receives a
+   * {@link DeletedException}. This will create a new Checkpoint.
+   */
+  private static CompletableFuture<Map<TaskName, Checkpoint>> 
restoreAllTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
Map<TaskName, Checkpoint> taskCheckpoints,
+      Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames, 
JobContext jobContext,
+      ContainerModel containerModel, SamzaContainerMetrics 
samzaContainerMetrics, CheckpointManager checkpointManager,
+      Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
ExecutorService executor,
+      File loggedStoreDir, Set<String> forceRestoreTask) {
+
+    Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new 
ConcurrentHashMap<>();
+    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+    // Submit restore callable for each taskInstance
+    taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+      // Submit for each restore factory
+      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+        long startTime = System.currentTimeMillis();
+        String taskName = taskInstanceName.getTaskName();
+        LOG.info("Starting restore for state for task: {}", taskName);
+
+        CompletableFuture<Void> restoreFuture;
+        if (forceRestoreTask.contains(taskName) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+          // If init was retried with getDeleted, force restore with 
getDeleted as well, since init only inits the
+          // restoreManager with deleted SnapshotIndex but does not retry to 
recover the deleted blobs and delegates it
+          // to restore().
+          // Create an empty future that fails immediately with 
DeletedException to force retry in restore.
+          restoreFuture = new CompletableFuture<>();
+          restoreFuture.completeExceptionally(new SamzaException(new 
DeletedException()));
+        } else {
+          restoreFuture = taskRestoreManager.restore();
+        }
+
+        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
+          updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+
+          if (ex != null) {
+            if (isUnwrappedExceptionDeletedException(ex)) {
+              LOG.warn("Received DeletedException during restore for task {}. 
Attempting to get blobs with getDeleted flag",
+                  taskInstanceName.getTaskName());
+
+              // Try to restore with getDeleted flag
+              CompletableFuture<Checkpoint> future =
+                  restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
+                      checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
+                      
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
+                      jobContext, containerModel);
+              try {
+                newTaskCheckpoints.put(taskInstanceName, future);
+              } catch (Exception e) {
+                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
+                throw new SamzaException(msg, e);
+              } finally {
+                updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
+              }
+            } else {
+              // log and rethrow exception to communicate restore failure
+              String msg = String.format("Error restoring state for task: %s", 
taskName);
+              LOG.error(msg, ex);
+              throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
+            }
+          }
+
+          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
+          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
+          try {
+            taskRestoreManager.close();
+          } catch (Exception e) {
+            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
+                ex != null ? "unsuccessful" : "successful", e);
+            // ignore exception from close. container may still be able to 
continue processing/backups
+            // if restore manager close fails.
+          }
+          return null;
+        });
+        taskRestoreFutures.add(taskRestoreFuture);
+      });
+    });
+    CompletableFuture<Void> restoreFutures = 
CompletableFuture.allOf(taskRestoreFutures.toArray(new CompletableFuture[0]));
+    return restoreFutures.thenCompose(ignoredVoid -> 
FutureUtil.toFutureOfMap(newTaskCheckpoints));
+  }
+
+  /**
+   * Returns a single future that guarantees all the following are completed, 
in this order:
+   *   1. Restore state locally by getting deleted blobs from the blob store.
+   *   2. Create a new snapshot from restored state by backing it up on the 
blob store.
+   *   3. Remove TTL from the new Snapshot and all the associated blobs in the 
blob store
+   *   4. Clean up old/deleted Snapshot
+   *   5. Create and write the new checkpoint to checkpoint topic
+   */
+  private static CompletableFuture<Checkpoint> restoreDeletedSnapshot(TaskName 
taskName,
+      Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager 
checkpointManager,
+      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      ExecutorService executor, Set<String> storesToRestore, File 
loggedStoreBaseDirectory, JobContext jobContext,
+      ContainerModel containerModel) {
+
+    // if taskInstanceMetrics are specified use those for store metrics,
+    // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
+    MetricsRegistry metricsRegistry =
+        taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry()
+            : new MetricsRegistryMap();
+
+    BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
+    JobConfig jobConfig = new JobConfig(config);
+    BlobStoreUtil blobStoreUtil =
+        new BlobStoreUtil(blobStoreManager, executor, new 
BlobStoreConfig(config), null,
+            new BlobStoreRestoreManagerMetrics(metricsRegistry));
+
+    BlobStoreRestoreManager blobStoreRestoreManager = 
(BlobStoreRestoreManager) taskRestoreManager;
+
+    CheckpointId checkpointId = CheckpointId.create();
+    Map<String, String> oldSCMs = ((CheckpointV2) 
taskCheckpoints.get(taskName)).getStateCheckpointMarkers()
+        .get(BlobStoreStateBackendFactory.class.getName());
+
+    // 1. Restore state with getDeleted flag set to true
+    CompletableFuture<Void> restoreFuture = 
blobStoreRestoreManager.restore(true);
+
+    // 2. Create a new checkpoint and back it up on the blob store
+    CompletableFuture<Map<String, String>> backupStoresFuture = 
restoreFuture.thenCompose(
+      r -> backupRecoveredStore(jobContext, containerModel, config, taskName, 
storesToRestore, checkpointId,
+          loggedStoreBaseDirectory, blobStoreManager, metricsRegistry, 
executor));
+
+    // 3. Mark new Snapshots to never expire
+    CompletableFuture<Void> removeNewSnapshotsTTLFuture = 
backupStoresFuture.thenCompose(
+      storeSCMs -> {
+        List<CompletableFuture<Void>> removeTTLForSnapshotIndexFutures = new 
ArrayList<>();
+        storeSCMs.forEach((store, scm) -> {
+          Metadata requestMetadata = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
+              jobConfig.getName().get(), jobConfig.getJobId(), 
taskName.getTaskName(), store);
+          
removeTTLForSnapshotIndexFutures.add(blobStoreUtil.removeTTLForSnapshotIndex(scm,
 requestMetadata));
+        });
+        return 
CompletableFuture.allOf(removeTTLForSnapshotIndexFutures.toArray(new 
CompletableFuture[0]));
+      });
+
+    // 4. Delete prev SnapshotIndex including files/subdirs
+    CompletableFuture<Void> deleteOldSnapshotsFuture = 
removeNewSnapshotsTTLFuture.thenCompose(
+      ignore -> {
+        List<CompletableFuture<Void>> deletePrevSnapshotFutures = new 
ArrayList<>();
+        oldSCMs.forEach((store, oldSCM) -> {
+          Metadata requestMetadata = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
+              jobConfig.getName().get(), jobConfig.getJobId(), 
taskName.getTaskName(), store);
+          
deletePrevSnapshotFutures.add(blobStoreUtil.cleanSnapshotIndex(oldSCM, 
requestMetadata, true).toCompletableFuture());
+        });
+        return CompletableFuture.allOf(deletePrevSnapshotFutures.toArray(new 
CompletableFuture[0]));
+      });
+
+    // 5. create new checkpoint
+    CompletableFuture<Checkpoint> newTaskCheckpointsFuture =
+        deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms) 
->
+            writeNewCheckpoint(taskName, checkpointId, scms, 
checkpointManager));
+
+    return newTaskCheckpointsFuture.exceptionally(ex -> {
+      String msg = String.format("Could not restore task: %s after attempting 
to restore deleted blobs.", taskName);
+      throw new SamzaException(msg, ex);
+    });
+  }
+
+  private static CompletableFuture<Map<String, String>> 
backupRecoveredStore(JobContext jobContext,
+      ContainerModel containerModel, Config config, TaskName taskName, 
Set<String> storesToBackup,
+      CheckpointId newCheckpointId, File loggedStoreBaseDirectory, 
BlobStoreManager blobStoreManager,
+      MetricsRegistry metricsRegistry, ExecutorService executor) {
+
+    BlobStoreBackupManagerMetrics blobStoreBackupManagerMetrics = new 
BlobStoreBackupManagerMetrics(metricsRegistry);
+    BlobStoreBackupManager blobStoreBackupManager =
+        new BlobStoreBackupManager(jobContext.getJobModel(), containerModel, 
containerModel.getTasks().get(taskName),
+            executor, blobStoreBackupManagerMetrics, config, 
SystemClock.instance(), loggedStoreBaseDirectory,
+            new StorageManagerUtil(), blobStoreManager);
+
+    // create checkpoint dir as a copy of store dir
+    createCheckpointDirFromStoreDirCopy(taskName, 
containerModel.getTasks().get(taskName),
+        loggedStoreBaseDirectory, storesToBackup, newCheckpointId);
+    // upload to blob store and return future
+    return blobStoreBackupManager.upload(newCheckpointId, new HashMap<>());
+  }
+
+  private static Checkpoint writeNewCheckpoint(TaskName taskName, CheckpointId 
checkpointId,
+      Map<String, String> uploadSCMs, CheckpointManager checkpointManager) {
+    CheckpointV2 oldCheckpoint = (CheckpointV2) 
checkpointManager.readLastCheckpoint(taskName);
+    Map<SystemStreamPartition, String> inputOffsets = 
oldCheckpoint.getOffsets();
+
+    ImmutableMap.Builder<String, Map<String, String>> newSCMBuilder = 
ImmutableMap.builder();
+    newSCMBuilder.put(BlobStoreStateBackendFactory.class.getName(), 
uploadSCMs);
+
+    Map<String, String> oldKafkaChangelogStateCheckpointMarkers =
+        
oldCheckpoint.getStateCheckpointMarkers().get(KafkaChangelogStateBackendFactory.class.getName());
+    if (oldKafkaChangelogStateCheckpointMarkers != null) {
+      newSCMBuilder.put(KafkaChangelogStateBackendFactory.class.getName(), 
oldKafkaChangelogStateCheckpointMarkers);
+    }
+
+    CheckpointV2 checkpointV2 = new CheckpointV2(checkpointId, inputOffsets, 
newSCMBuilder.build());
+    checkpointManager.writeCheckpoint(taskName, checkpointV2);
+    return checkpointV2;
+  }
+
+  private static void createCheckpointDirFromStoreDirCopy(TaskName taskName, 
TaskModel taskModel,
+      File loggedStoreBaseDir, Set<String> storeName, CheckpointId 
checkpointId) {
+    StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+    for (String store : storeName) {
+      try {
+        File storeDirectory =
+            storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, store, 
taskName, taskModel.getTaskMode());
+        File checkpointDir = new 
File(storageManagerUtil.getStoreCheckpointDir(storeDirectory, checkpointId));
+        FileUtils.copyDirectory(storeDirectory, checkpointDir);
+      } catch (IOException exception) {
+        String msg = String.format("Unable to create a copy of store directory 
%s into checkpoint dir %s while "
+            + "attempting to recover from DeletedException", store, 
checkpointId);
+        throw new SamzaException(msg, exception);
+      }
+    }
+  }
+
+  private static BlobStoreManager getBlobStoreManager(Config config, 
ExecutorService executor) {
+    BlobStoreConfig blobStoreConfig = new BlobStoreConfig(config);
+    String blobStoreManagerFactory = 
blobStoreConfig.getBlobStoreManagerFactory();
+    BlobStoreManagerFactory factory = 
ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
+    return factory.getRestoreBlobStoreManager(config, executor);
+  }
+
+  private static void updateRestoreTime(long startTime, SamzaContainerMetrics 
samzaContainerMetrics,
+      TaskName taskInstance) {
+    long timeToRestore = System.currentTimeMillis() - startTime;
+    if (samzaContainerMetrics != null) {
+      Gauge taskGauge = 
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, 
null);
+
+      if (taskGauge != null) {
+        taskGauge.set(timeToRestore);
+      }
+    }
+  }
+
+  private static Boolean isUnwrappedExceptionDeletedException(Throwable ex) {
+    Throwable unwrappedException = 
FutureUtil.unwrapExceptions(CompletionException.class,
+        FutureUtil.unwrapExceptions(SamzaException.class, ex));
+    return unwrappedException instanceof DeletedException;
+  }
+}
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index d9c9cd411..a22155261 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -57,6 +57,7 @@ import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ContainerStorageManagerUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManagerUtil.class);
 
@@ -316,7 +317,7 @@ public class ContainerStorageManagerUtil {
   /**
    * Returns a map of backend factory names to subset of provided storeNames 
that should be restored using it.
    * For CheckpointV1, only includes stores that should be restored using a 
configured changelog.
-   * For CheckpointV2, associates stores with the highest precedence 
configured restore factory that has a SCM in
+   * For CheckpointV2, associates stores with the highest precedence 
configured restore factory that has SCM in
    * the checkpoint, or the highest precedence restore factory configured if 
there are no SCMs in the checkpoint.
    */
   public static Map<String, Set<String>> getBackendFactoryStoreNames(
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
index ea00433c3..e766a351d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskStorageCommitManager.java
@@ -86,7 +86,7 @@ public class TestTaskStorageCommitManager {
         ForkJoinPool.commonPool(), new StorageManagerUtil(), null, null);
 
     
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
-    cm.init();
+    cm.init(checkpoint);
     verify(taskBackupManager1).init(eq(checkpoint));
     verify(taskBackupManager2).init(eq(checkpoint));
   }
@@ -105,7 +105,7 @@ public class TestTaskStorageCommitManager {
     TaskStorageCommitManager cm = new TaskStorageCommitManager(task, 
backupManagers, containerStorageManager,
         Collections.emptyMap(), new Partition(1), null, new MapConfig(),
         ForkJoinPool.commonPool(), new StorageManagerUtil(), null, null);
-    cm.init();
+    cm.init(null);
     verify(taskBackupManager1).init(eq(null));
     verify(taskBackupManager2).init(eq(null));
   }
@@ -130,7 +130,7 @@ public class TestTaskStorageCommitManager {
         Collections.emptyMap(), new Partition(1), checkpointManager, new 
MapConfig(),
         ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
     
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
-    cm.init();
+    cm.init(checkpoint);
     verify(taskBackupManager1).init(eq(checkpoint));
     verify(taskBackupManager2).init(eq(checkpoint));
 
@@ -220,7 +220,7 @@ public class TestTaskStorageCommitManager {
         Collections.emptyMap(), new Partition(1), checkpointManager, new 
MapConfig(),
         ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
     
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
-    cm.init();
+    cm.init(checkpoint);
     verify(taskBackupManager1).init(eq(checkpoint));
     verify(taskBackupManager2).init(eq(checkpoint));
 
@@ -243,7 +243,7 @@ public class TestTaskStorageCommitManager {
         .thenReturn(CompletableFuture.completedFuture(factory2Checkpoints));
     when(mockLIStore.checkpoint(newCheckpointId)).thenReturn(Optional.empty());
 
-    cm.init();
+    cm.init(checkpoint);
     cm.snapshot(newCheckpointId);
 
     // Assert stores where flushed
@@ -295,7 +295,7 @@ public class TestTaskStorageCommitManager {
 
     
when(containerStorageManager.getAllStores(taskName)).thenReturn(storageEngines);
     CheckpointId newCheckpointId = CheckpointId.create();
-    cm.init();
+    cm.init(null);
     cm.snapshot(newCheckpointId);
 
     // Assert stores where flushed
@@ -457,7 +457,7 @@ public class TestTaskStorageCommitManager {
         changelogSSP, kafkaChangelogSSPOffset.toString()
     );
 
-    commitManager.init();
+    commitManager.init(null);
     // invoke persist to file system for v2 checkpoint
     commitManager.writeCheckpointToStoreDirectories(new 
CheckpointV1(offsetsJava));
 
@@ -564,7 +564,7 @@ public class TestTaskStorageCommitManager {
     );
     CheckpointV2 checkpoint = new CheckpointV2(newCheckpointId, 
Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
 
-    commitManager.init();
+    commitManager.init(null);
     // invoke persist to file system
     commitManager.writeCheckpointToStoreDirectories(checkpoint);
     // Validate only durable and persisted stores are persisted
@@ -634,7 +634,7 @@ public class TestTaskStorageCommitManager {
         changelogSSP, kafkaChangelogSSPOffset.toString()
     );
 
-    commitManager.init();
+    commitManager.init(null);
     // invoke persist to file system for v2 checkpoint
     commitManager.writeCheckpointToStoreDirectories(new 
CheckpointV1(offsetsJava));
 
@@ -722,7 +722,7 @@ public class TestTaskStorageCommitManager {
         changelogSSP, kafkaChangelogSSPOffset.toString()
     );
 
-    commitManager.init();
+    commitManager.init(null);
     // invoke persist to file system for v1 checkpoint
     commitManager.writeCheckpointToStoreDirectories(new 
CheckpointV1(offsetsJava));
 
@@ -824,7 +824,7 @@ public class TestTaskStorageCommitManager {
         changelogSSP, kafkaChangelogSSPOffset.toString()
     );
 
-    commitManager.init();
+    commitManager.init(null);
     // invoke persist to file system for v2 checkpoint
     commitManager.writeCheckpointToStoreDirectories(new 
CheckpointV1(offsetsJava));
     assertTrue(mockFileSystem.isEmpty());
@@ -870,7 +870,7 @@ public class TestTaskStorageCommitManager {
     CheckpointV2 checkpoint = new CheckpointV2(CheckpointId.create(), 
Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
     
doThrow(IOException.class).when(storageManagerUtil).writeCheckpointV2File(eq(tmpTestPath),
 eq(checkpoint));
 
-    commitManager.init();
+    commitManager.init(null);
     // Should throw samza exception since writeCheckpointV2 failed
     commitManager.writeCheckpointToStoreDirectories(checkpoint);
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
index 7a0cdc0a3..4f35521dd 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
@@ -133,7 +133,13 @@ public class TestBlobStoreBackupManager {
 
     // Mock - return snapshot index for blob id from test blob store map
     ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
-    when(blobStoreUtil.getSnapshotIndex(captor.capture(), any(Metadata.class)))
+    when(blobStoreUtil.getSnapshotIndex(captor.capture(), any(Metadata.class), 
anyBoolean()))
+        .then((Answer<CompletableFuture<SnapshotIndex>>) invocation -> {
+          String blobId = invocation.getArgumentAt(0, String.class);
+          return CompletableFuture.completedFuture(testBlobStore.get(blobId));
+        });
+
+    when(blobStoreUtil.getSnapshotIndex(captor.capture(), any(Metadata.class), 
anyBoolean()))
         .then((Answer<CompletableFuture<SnapshotIndex>>) invocation -> {
           String blobId = invocation.getArgumentAt(0, String.class);
           return CompletableFuture.completedFuture(testBlobStore.get(blobId));
@@ -163,7 +169,8 @@ public class TestBlobStoreBackupManager {
     // verify delete snapshot index blob called from init 0 times because 
prevSnapshotMap returned from init is empty
     // in case of null checkpoint.
     verify(blobStoreUtil, times(0)).deleteSnapshotIndexBlob(anyString(), 
any(Metadata.class));
-    when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(), any(Checkpoint.class), 
anySetOf(String.class))).thenCallRealMethod();
+    when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(), any(Checkpoint.class),
+        anySetOf(String.class), anyBoolean())).thenCallRealMethod();
 
     // init called with Checkpoint V1 -> unsupported
     Checkpoint checkpoint = new CheckpointV1(new HashMap<>());
@@ -292,7 +299,10 @@ public class TestBlobStoreBackupManager {
     Checkpoint checkpoint =
         new CheckpointV2(checkpointId, new HashMap<>(),
             ImmutableMap.of(BlobStoreStateBackendFactory.class.getName(), 
previousCheckpoints));
-    when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(), any(Checkpoint.class), 
anySetOf(String.class))).thenCallRealMethod();
+    when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(), any(Checkpoint.class),
+        anySetOf(String.class), anyBoolean())).thenCallRealMethod();
+    when(blobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(), any(Checkpoint.class),
+        anySetOf(String.class), anyBoolean())).thenCallRealMethod();
     blobStoreBackupManager.init(checkpoint);
 
     // mock: set task store dir to return corresponding test local store and 
create checkpoint dir
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
index ddc0c8e19..67d62759e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
@@ -88,16 +88,13 @@ public class TestBlobStoreRestoreManager {
     when(mockSnapshotIndex.getDirIndex()).thenReturn(dirIndex);
 
     BlobStoreUtil blobStoreUtil = mock(BlobStoreUtil.class);
-    when(blobStoreUtil.cleanUpDir(any(DirIndex.class), 
any(Metadata.class))).thenReturn(CompletableFuture.completedFuture(null));
-    when(blobStoreUtil.deleteDir(any(DirIndex.class), 
any(Metadata.class))).thenReturn(CompletableFuture.completedFuture(null));
-    when(blobStoreUtil.deleteSnapshotIndexBlob(anyString(), 
any(Metadata.class))).thenReturn(CompletableFuture.completedFuture(null));
+    when(blobStoreUtil.cleanSnapshotIndex(anyString(), 
any(SnapshotIndex.class), any(Metadata.class)))
+        .thenReturn(CompletableFuture.completedFuture(null));
 
     BlobStoreRestoreManager.deleteUnusedStoresFromBlobStore(
         jobName, jobId, taskName, storageConfig, blobStoreConfig, 
initialStoreSnapshotIndexes, blobStoreUtil, EXECUTOR);
 
-    verify(blobStoreUtil, times(1)).cleanUpDir(eq(dirIndex), 
any(Metadata.class));
-    verify(blobStoreUtil, times(1)).deleteDir(eq(dirIndex), 
any(Metadata.class));
-    verify(blobStoreUtil, times(1)).deleteSnapshotIndexBlob(eq(blobId), 
any(Metadata.class));
+    verify(blobStoreUtil, times(1)).cleanSnapshotIndex(eq(blobId), 
any(SnapshotIndex.class), any(Metadata.class));
 
   }
 
@@ -201,16 +198,16 @@ public class TestBlobStoreRestoreManager {
     DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
 
     // return immediately without restoring.
-    when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class)))
+    when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(null));
     when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true);
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
 
     // verify that the store directory restore was called and skipped (i.e. 
shouldRestore == true)
-    verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class));
+    verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class), anyBoolean());
     // verify that the store directory was deleted prior to restore
     // (should still not exist at the end since restore is no-op)
     assertFalse(storeDir.toFile().exists());
@@ -254,15 +251,15 @@ public class TestBlobStoreRestoreManager {
 
     when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true);
     // return immediately without restoring.
-    when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class)))
+    when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(null));
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
 
     // verify that the store directory restore was called and skipped (i.e. 
shouldRestore == true)
-    verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class));
+    verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class), anyBoolean());
     // verify that the checkpoint directories were deleted prior to restore 
(should not exist at the end)
     assertFalse(storeCheckpointDir1.toFile().exists());
     assertFalse(storeCheckpointDir2.toFile().exists());
@@ -310,15 +307,15 @@ public class TestBlobStoreRestoreManager {
     // ensures shouldRestore is not called
     when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, 
arg2) -> true);
     // return immediately without restoring.
-    when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class)))
+    when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(null));
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
 
     // verify that the store directory restore was not called (should have 
restored from checkpoint dir)
-    verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class));
+    verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()), 
eq(dirIndex), any(Metadata.class), anyBoolean());
     // verify that the checkpoint dir was renamed to store dir
     assertFalse(storeCheckpointDir.toFile().exists());
     assertTrue(storeDir.toFile().exists());
@@ -352,11 +349,11 @@ public class TestBlobStoreRestoreManager {
 
     BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, 
storesToRestore, prevStoreSnapshotIndexes,
         loggedBaseDir.toFile(), storageConfig, metrics,
-        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR);
+        storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
 
     // verify that we checked the previously checkpointed SCMs.
     verify(prevStoreSnapshotIndexes, times(1)).containsKey(eq("newStoreName"));
     // verify that the store directory restore was never called
-    verify(blobStoreUtil, times(0)).restoreDir(any(File.class), 
any(DirIndex.class), any(Metadata.class));
+    verify(blobStoreUtil, times(0)).restoreDir(any(File.class), 
any(DirIndex.class), any(Metadata.class), anyBoolean());
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
index 37512eaf3..5b79315b6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java
@@ -610,7 +610,7 @@ public class TestBlobStoreUtil {
     
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
 
     BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
-    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class))).thenAnswer(
+    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class), any(Boolean.class))).thenAnswer(
       (Answer<CompletionStage<Void>>) invocationOnMock -> {
         String blobId = invocationOnMock.getArgumentAt(0, String.class);
         OutputStream outputStream = invocationOnMock.getArgumentAt(1, 
OutputStream.class);
@@ -623,7 +623,7 @@ public class TestBlobStoreUtil {
       });
 
     BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, 
EXECUTOR, blobStoreConfig, null, null);
-    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata).join();
+    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata, false).join();
 
     assertTrue(
         new DirDiffUtil().areSameDir(Collections.emptySet(), 
false).test(restoreDirBasePath.toFile(), mockDirIndex));
@@ -663,7 +663,7 @@ public class TestBlobStoreUtil {
     
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
 
     BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
-    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class))).thenAnswer(
+    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class), any(Boolean.class))).thenAnswer(
       (Answer<CompletionStage<Void>>) invocationOnMock -> { // first try, 
retriable error
         String blobId = invocationOnMock.getArgumentAt(0, String.class);
         OutputStream outputStream = invocationOnMock.getArgumentAt(1, 
OutputStream.class);
@@ -681,7 +681,7 @@ public class TestBlobStoreUtil {
       });
 
     BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, 
EXECUTOR, blobStoreConfig, null, null);
-    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata).join();
+    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata, false).join();
 
     assertTrue(
         new DirDiffUtil().areSameDir(Collections.emptySet(), 
false).test(restoreDirBasePath.toFile(), mockDirIndex));
@@ -721,7 +721,7 @@ public class TestBlobStoreUtil {
     
when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex));
 
     BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
-    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class))).thenReturn(
+    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class), any(Boolean.class))).thenReturn(
         FutureUtil.failedFuture(new IllegalArgumentException())) // non 
retriable error
         .thenAnswer((Answer<CompletionStage<Void>>) invocationOnMock -> {
           String blobId = invocationOnMock.getArgumentAt(0, String.class);
@@ -735,7 +735,7 @@ public class TestBlobStoreUtil {
 
     BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, 
EXECUTOR, blobStoreConfig, null, null);
     try {
-      blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata).join();
+      blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, 
metadata, false).join();
       fail("Should have failed on non-retriable errors during file restore");
     } catch (CompletionException e) {
       assertTrue(e.getCause() instanceof IllegalArgumentException);
@@ -750,7 +750,7 @@ public class TestBlobStoreUtil {
     String localSnapshotFiles = "[a, b, z/1, y/1, p/m/1, q/n/1]";
     Path localSnapshot = BlobStoreTestUtil.createLocalDir(localSnapshotFiles);
     BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
-    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class))).thenAnswer(
+    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class), any(Boolean.class))).thenAnswer(
       (Answer<CompletionStage<Void>>) invocationOnMock -> {
         String blobId = invocationOnMock.getArgumentAt(0, String.class);
         OutputStream outputStream = invocationOnMock.getArgumentAt(1, 
OutputStream.class);
@@ -775,7 +775,7 @@ public class TestBlobStoreUtil {
     DirIndex dirIndex = BlobStoreTestUtil.createDirIndex(prevSnapshotFiles);
 
     BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class);
-    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class))).thenAnswer(
+    when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), 
any(Metadata.class), any(Boolean.class))).thenAnswer(
       (Answer<CompletionStage<Void>>) invocationOnMock -> {
         String blobId = invocationOnMock.getArgumentAt(0, String.class);
         OutputStream outputStream = invocationOnMock.getArgumentAt(1, 
OutputStream.class);
@@ -786,7 +786,7 @@ public class TestBlobStoreUtil {
 
     Path restoreDirBasePath = 
Files.createTempDirectory(BlobStoreTestUtil.TEMP_DIR_PREFIX);
     BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, 
EXECUTOR, blobStoreConfig, null, null);
-    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, 
metadata).join();
+    blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, metadata, 
false).join();
 
     assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), 
false).test(restoreDirBasePath.toFile(), dirIndex));
   }
@@ -800,7 +800,8 @@ public class TestBlobStoreUtil {
     BlobStoreUtil blobStoreUtil =
         new BlobStoreUtil(mock(BlobStoreManager.class), 
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
     Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
-        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", null, new HashSet<>());
+        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+            null, new HashSet<>(), false);
     assertTrue(snapshotIndexes.isEmpty());
   }
 
@@ -810,7 +811,8 @@ public class TestBlobStoreUtil {
     BlobStoreUtil blobStoreUtil =
         new BlobStoreUtil(mock(BlobStoreManager.class), 
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
     Map<String, Pair<String, SnapshotIndex>> prevSnapshotIndexes =
-        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", mockCheckpoint, new HashSet<>());
+        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+            mockCheckpoint, new HashSet<>(), false);
     assertEquals(prevSnapshotIndexes.size(), 0);
   }
 
@@ -824,7 +826,8 @@ public class TestBlobStoreUtil {
     BlobStoreUtil blobStoreUtil =
         new BlobStoreUtil(mock(BlobStoreManager.class), 
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
     Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
-        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", mockCheckpoint, new HashSet<>());
+        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+            mockCheckpoint, new HashSet<>(), false);
     assertTrue(snapshotIndexes.isEmpty());
   }
 
@@ -838,7 +841,8 @@ public class TestBlobStoreUtil {
     BlobStoreUtil blobStoreUtil =
         new BlobStoreUtil(mock(BlobStoreManager.class), 
MoreExecutors.newDirectExecutorService(), blobStoreConfig, null, null);
     Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
-        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", mockCheckpoint, new HashSet<>());
+        blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+            mockCheckpoint, new HashSet<>(), false);
     assertTrue(snapshotIndexes.isEmpty());
   }
 
@@ -849,36 +853,11 @@ public class TestBlobStoreUtil {
     Set<String> storesToBackupOrRestore = new HashSet<>();
     storesToBackupOrRestore.add("storeName");
     BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
-    when(mockBlobStoreUtil.getSnapshotIndex(anyString(), 
any(Metadata.class))).thenThrow(new RuntimeException());
+    when(mockBlobStoreUtil.getSnapshotIndex(anyString(), any(Metadata.class), 
anyBoolean())).thenThrow(new RuntimeException());
     when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(),
-        any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
-    mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", checkpoint, storesToBackupOrRestore);
-  }
-
-  @Test
-  public void testGetSSISkipsStoresWithSnapshotIndexAlreadyDeleted() {
-    String store = "storeName1";
-    String otherStore = "storeName2";
-    Checkpoint checkpoint = 
createCheckpointV2(BlobStoreStateBackendFactory.class.getName(),
-        ImmutableMap.of(store, "snapshotIndexBlobId1", otherStore, 
"snapshotIndexBlobId2"));
-    Set<String> storesToBackupOrRestore = new HashSet<>();
-    storesToBackupOrRestore.add(store);
-    storesToBackupOrRestore.add(otherStore);
-    SnapshotIndex store1SnapshotIndex = mock(SnapshotIndex.class);
-    BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
-
-    CompletableFuture<SnapshotIndex> failedFuture = 
FutureUtil.failedFuture(new DeletedException());
-    when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId1"), 
any(Metadata.class))).thenReturn(
-        CompletableFuture.completedFuture(store1SnapshotIndex));
-    when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId2"), 
any(Metadata.class))).thenReturn(failedFuture);
-    when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(),
-        any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
-
-    Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
-        mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", checkpoint, storesToBackupOrRestore);
-    assertEquals(1, snapshotIndexes.size());
-    assertEquals("snapshotIndexBlobId1", 
snapshotIndexes.get("storeName1").getLeft());
-    assertEquals(store1SnapshotIndex, 
snapshotIndexes.get("storeName1").getRight());
+        any(Checkpoint.class), anySetOf(String.class), 
any(Boolean.class))).thenCallRealMethod();
+    mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+        checkpoint, storesToBackupOrRestore, false);
   }
 
   @Test
@@ -893,15 +872,16 @@ public class TestBlobStoreUtil {
     SnapshotIndex store1SnapshotIndex = mock(SnapshotIndex.class);
     BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
     when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(),
-        any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
+        any(Checkpoint.class), anySetOf(String.class), 
anyBoolean())).thenCallRealMethod();
     RuntimeException nonIgnoredException = new RuntimeException();
     CompletableFuture<SnapshotIndex> failedFuture = 
FutureUtil.failedFuture(nonIgnoredException);
-    when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId1"), 
any(Metadata.class))).thenReturn(
+    when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId1"), 
any(Metadata.class), anyBoolean())).thenReturn(
         FutureUtil.failedFuture(new DeletedException())); // should fail even 
if some errors are ignored
-    when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId2"), 
any(Metadata.class))).thenReturn(failedFuture);
+    when(mockBlobStoreUtil.getSnapshotIndex(eq("snapshotIndexBlobId2"), 
any(Metadata.class), anyBoolean())).thenReturn(failedFuture);
 
     try {
-      mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", checkpoint, storesToBackupOrRestore);
+      mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+          checkpoint, storesToBackupOrRestore, false);
       fail("Should have thrown an exception");
     } catch (Exception e) {
       Throwable cause =
@@ -925,21 +905,22 @@ public class TestBlobStoreUtil {
 
     BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
 
-    when(mockBlobStoreUtil.getSnapshotIndex(eq(storeSnapshotIndexBlobId), 
any(Metadata.class))).thenReturn(
+    when(mockBlobStoreUtil.getSnapshotIndex(eq(storeSnapshotIndexBlobId), 
any(Metadata.class), any(Boolean.class))).thenReturn(
         CompletableFuture.completedFuture(mockStoreSnapshotIndex));
-    when(mockBlobStoreUtil.getSnapshotIndex(eq(otherStoreSnapshotIndexBlobId), 
any(Metadata.class))).thenReturn(
+    when(mockBlobStoreUtil.getSnapshotIndex(eq(otherStoreSnapshotIndexBlobId), 
any(Metadata.class), any(Boolean.class))).thenReturn(
         CompletableFuture.completedFuture(mockOtherStooreSnapshotIndex));
     when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(),
-        any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
+        any(Checkpoint.class), anySetOf(String.class), 
any(Boolean.class))).thenCallRealMethod();
 
     Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
-        mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", checkpoint, storesToBackupOrRestore);
+        mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+            checkpoint, storesToBackupOrRestore, false);
 
     assertEquals(storeSnapshotIndexBlobId, 
snapshotIndexes.get(storeName).getKey());
     assertEquals(mockStoreSnapshotIndex, 
snapshotIndexes.get(storeName).getValue());
     assertEquals(otherStoreSnapshotIndexBlobId, 
snapshotIndexes.get(otherStoreName).getKey());
     assertEquals(mockOtherStooreSnapshotIndex, 
snapshotIndexes.get(otherStoreName).getValue());
-    verify(mockBlobStoreUtil, times(2)).getSnapshotIndex(anyString(), 
any(Metadata.class));
+    verify(mockBlobStoreUtil, times(2)).getSnapshotIndex(anyString(), 
any(Metadata.class), any(Boolean.class));
   }
 
   @Test
@@ -952,15 +933,17 @@ public class TestBlobStoreUtil {
     CheckpointV2 checkpoint = 
createCheckpointV2(BlobStoreStateBackendFactory.class.getName(),
         ImmutableMap.of(store, "1", anotherStore, "2", oneMoreStore, "3"));
     BlobStoreUtil mockBlobStoreUtil = mock(BlobStoreUtil.class);
-    when(mockBlobStoreUtil.getSnapshotIndex(any(String.class), 
any(Metadata.class)))
+    when(mockBlobStoreUtil.getSnapshotIndex(any(String.class), 
any(Metadata.class), anyBoolean()))
         .thenReturn(CompletableFuture.completedFuture(mockStoreSnapshotIndex));
     when(mockBlobStoreUtil.getStoreSnapshotIndexes(anyString(), anyString(), 
anyString(),
-        any(Checkpoint.class), anySetOf(String.class))).thenCallRealMethod();
+        any(Checkpoint.class), anySetOf(String.class), 
anyBoolean())).thenCallRealMethod();
 
     Map<String, Pair<String, SnapshotIndex>> snapshotIndexes =
-        mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName", checkpoint, storesToBackupOrRestore);
+        mockBlobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", 
"taskName",
+            checkpoint, storesToBackupOrRestore, false);
 
-    verify(mockBlobStoreUtil, 
times(storesToBackupOrRestore.size())).getSnapshotIndex(anyString(), 
any(Metadata.class));
+    verify(mockBlobStoreUtil, times(storesToBackupOrRestore.size()))
+        .getSnapshotIndex(anyString(), any(Metadata.class), anyBoolean());
   }
 
   /**
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 6b2154e00..d09ad0758 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -25,6 +25,7 @@ import java.util
 import java.util.concurrent.atomic.AtomicReference
 import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
 import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config.{ClusterManagerConfig, Config, MapConfig}
 import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
 import org.apache.samza.coordinator.JobModelManager
@@ -90,7 +91,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
 
   @Test
   def testExceptionInTaskInitShutsDownTask() {
-    when(this.taskInstance.initTask).thenThrow(new RuntimeException("Trigger a 
shutdown, please."))
+    when(this.taskInstance.initTask(any[Option[Checkpoint]])).thenThrow(new 
RuntimeException("Trigger a shutdown, please."))
 
     this.samzaContainer.run
 
@@ -105,7 +106,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
 
   @Test
   def testErrorInTaskInitShutsDownTask(): Unit = {
-    when(this.taskInstance.initTask).thenThrow(new NoSuchMethodError("Trigger 
a shutdown, please."))
+    when(this.taskInstance.initTask(any[Option[Checkpoint]])).thenThrow(new 
NoSuchMethodError("Trigger a shutdown, please."))
 
     this.samzaContainer.run
 
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 08b082ca2..6afec52e7 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -186,7 +186,7 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
 
   @Test
   def testInitTask(): Unit = {
-    this.taskInstance.initTask
+    this.taskInstance.initTask(Some(mock[Checkpoint]))
 
     val contextCaptor = ArgumentCaptor.forClass(classOf[Context])
     verify(this.task).init(contextCaptor.capture())
@@ -221,7 +221,7 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
         null
       }
     })
-    taskInstance.initTask
+    taskInstance.initTask(Some(mock[Checkpoint]))
 
     verify(this.offsetManager).setStartingOffset(TASK_NAME, 
SYSTEM_STREAM_PARTITION, "10")
   }
@@ -1011,7 +1011,7 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
   @Test
   def testNoApplicationTaskContextFactoryProvided() {
     setupTaskInstance(None)
-    this.taskInstance.initTask
+    this.taskInstance.initTask(Some(mock[Checkpoint]))
     this.taskInstance.shutdownTask
     verifyZeroInteractions(this.applicationTaskContext)
   }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 15785644e..2b2b51885 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -21,7 +21,9 @@ package org.apache.samza.storage;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,8 +32,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.checkpoint.CheckpointV1;
 import org.apache.samza.checkpoint.CheckpointV2;
@@ -46,11 +53,23 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+import org.apache.samza.storage.blobstore.BlobStoreRestoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.exceptions.DeletedException;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
 import org.apache.samza.system.SSPMetadataCache;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
@@ -60,19 +79,26 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.SystemClock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import scala.collection.JavaConverters;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ReflectionUtil.class, 
ContainerStorageManagerRestoreUtil.class})
 public class TestContainerStorageManager {
 
   private static final String STORE_NAME = "store";
@@ -247,6 +273,9 @@ public class TestContainerStorageManager {
       return CompletableFuture.completedFuture(null);
     }).when(restoreManager).restore();
 
+    Map<TaskName, TaskInstanceCollector> taskInstanceCollectors = new 
HashMap<>();
+    tasks.keySet().forEach(taskName -> taskInstanceCollectors.put(taskName, 
mock(TaskInstanceCollector.class)));
+
     // Create the container storage manager
     this.containerStorageManager = new ContainerStorageManager(
         checkpointManager,
@@ -264,7 +293,7 @@ public class TestContainerStorageManager {
         mock(JobContext.class),
         mockContainerContext,
         ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, 
backendFactory),
-        mock(Map.class),
+        taskInstanceCollectors,
         DEFAULT_LOGGED_STORE_BASE_DIR,
         DEFAULT_STORE_BASE_DIR,
         null,
@@ -502,6 +531,173 @@ public class TestContainerStorageManager {
         factoriesToStores.get("factory2"));
   }
 
+  @Test
+  public void testInitRecoversFromDeletedException() {
+    TaskName taskName = new TaskName("task");
+    Set<String> stores = Collections.singleton("store");
+
+    BlobStoreRestoreManager taskRestoreManager = 
mock(BlobStoreRestoreManager.class);
+    Throwable deletedException = new SamzaException(new 
CompletionException(new DeletedException("410 gone")));
+    
doThrow(deletedException).when(taskRestoreManager).init(any(Checkpoint.class));
+    
when(taskRestoreManager.restore()).thenReturn(CompletableFuture.completedFuture(null));
+    
when(taskRestoreManager.restore(true)).thenReturn(CompletableFuture.completedFuture(null));
+
+    // mock ReflectionUtil.getObj
+    PowerMockito.mockStatic(ReflectionUtil.class);
+    BlobStoreManagerFactory blobStoreManagerFactory = 
mock(BlobStoreManagerFactory.class);
+    BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+    PowerMockito.when(ReflectionUtil.getObj(anyString(), 
eq(BlobStoreManagerFactory.class)))
+        .thenReturn(blobStoreManagerFactory);
+    when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class), 
any(ExecutorService.class)))
+        .thenReturn(blobStoreManager);
+
+    Map<String, TaskRestoreManager> storeTaskRestoreManager = 
ImmutableMap.of("store", taskRestoreManager);
+    CheckpointManager checkpointManager = mock(CheckpointManager.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getJobModel()).thenReturn(mock(JobModel.class));
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(new TaskName("test"));
+    ContainerModel containerModel = mock(ContainerModel.class);
+    when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, 
taskModel));
+    Checkpoint checkpoint = mock(CheckpointV2.class);
+    Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName, 
checkpoint);
+    Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
+        ImmutableMap.of(taskName, 
ImmutableMap.of(BlobStoreStateBackendFactory.class.getName(), stores));
+    Config config = new MapConfig(ImmutableMap.of("job.name", "test"), 
ImmutableMap.of("stores.store.backup.factories", 
BlobStoreStateBackendFactory.class.getName()));
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    SystemConsumer systemConsumer = mock(SystemConsumer.class);
+
+    
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName,
 storeTaskRestoreManager),
+        samzaContainerMetrics, checkpointManager, jobContext, containerModel, 
taskCheckpoints,
+        taskBackendFactoryToStoreNames, config, executor, new HashMap<>(), 
null,
+        ImmutableMap.of("store", systemConsumer));
+
+    // verify init() is called twice -> once without getDeleted flag, once 
with getDeleted flag
+    verify(taskRestoreManager, times(1)).init(any(Checkpoint.class));
+    verify(taskRestoreManager, times(1)).init(any(Checkpoint.class), 
anyBoolean());
+    // verify restore is called with getDeletedFlag only
+    verify(taskRestoreManager, times(0)).restore();
+    verify(taskRestoreManager, times(1)).restore(true);
+  }
+
+  @Test
+  public void testRestoreRecoversFromDeletedException() throws Exception {
+    TaskName taskName = new TaskName("task");
+    Set<String> stores = Collections.singleton("store");
+
+    BlobStoreRestoreManager taskRestoreManager = 
mock(BlobStoreRestoreManager.class);
+    doNothing().when(taskRestoreManager).init(any(Checkpoint.class));
+
+    CompletableFuture<Void> failedFuture = 
CompletableFuture.completedFuture(null)
+        .thenCompose(v -> { throw new DeletedException("410 Gone"); });
+    when(taskRestoreManager.restore()).thenReturn(failedFuture);
+    
when(taskRestoreManager.restore(true)).thenReturn(CompletableFuture.completedFuture(null));
+
+    Map<String, TaskRestoreManager> factoryToTaskRestoreManager = 
ImmutableMap.of(
+        BlobStoreStateBackendFactory.class.getName(), taskRestoreManager);
+
+    JobContext jobContext = mock(JobContext.class);
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(taskName);
+    when(taskModel.getTaskMode()).thenReturn(TaskMode.Active);
+
+    ContainerModel containerModel = mock(ContainerModel.class);
+    when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, 
taskModel));
+
+    CheckpointV2 checkpoint = mock(CheckpointV2.class);
+    when(checkpoint.getOffsets()).thenReturn(ImmutableMap.of());
+    when(checkpoint.getCheckpointId()).thenReturn(CheckpointId.create());
+    when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
+        KafkaChangelogStateBackendFactory.class.getName(), new HashMap<>()));
+
+    CheckpointManager checkpointManager = mock(CheckpointManager.class);
+    
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
+
+    String expectedOldBlobId = "oldBlobId";
+    when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
+        BlobStoreStateBackendFactory.class.getName(), ImmutableMap.of("store", 
expectedOldBlobId)));
+
+    Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName, 
checkpoint);
+
+    Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
+        ImmutableMap.of(taskName, ImmutableMap.of(
+            BlobStoreStateBackendFactory.class.getName(), stores));
+
+    Config config = new MapConfig(ImmutableMap.of(
+        "blob.store.manager.factory", 
BlobStoreStateBackendFactory.class.getName(),
+        "job.name", "test"));
+
+    ExecutorService executor = Executors.newFixedThreadPool(5);
+
+    SystemConsumer systemConsumer = mock(SystemConsumer.class);
+
+    // mock ReflectionUtil.getObj
+    PowerMockito.mockStatic(ReflectionUtil.class);
+    BlobStoreManagerFactory blobStoreManagerFactory = 
mock(BlobStoreManagerFactory.class);
+    BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+    PowerMockito.when(ReflectionUtil.getObj(anyString(), 
eq(BlobStoreManagerFactory.class)))
+        .thenReturn(blobStoreManagerFactory);
+    when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class), 
any(ExecutorService.class)))
+        .thenReturn(blobStoreManager);
+
+    // mock ContainerStorageManagerRestoreUtil.backupRecoveredStore
+    String expectedBlobId = "blobId";
+    PowerMockito.spy(ContainerStorageManagerRestoreUtil.class);
+    
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of("store",
 expectedBlobId)))
+        .when(ContainerStorageManagerRestoreUtil.class, "backupRecoveredStore",
+            any(JobContext.class), any(ContainerModel.class), 
any(Config.class),
+            any(TaskName.class), any(Set.class), any(Checkpoint.class), 
any(File.class),
+            any(BlobStoreManager.class), any(MetricsRegistry.class), 
any(ExecutorService.class));
+
+    SnapshotIndex snapshotIndex = new SnapshotIndex(System.currentTimeMillis(),
+        new SnapshotMetadata(CheckpointId.create(), "job", "test", "task", 
"store"),
+        new DirIndex("test", new ArrayList<>(), new ArrayList<>(), new 
ArrayList<>(), new ArrayList<>()),
+        Optional.empty());
+
+    ArgumentCaptor<String> getBlobIdCaptor = 
ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<ByteArrayOutputStream> outputStreamCaptor = 
ArgumentCaptor.forClass(ByteArrayOutputStream.class);
+    when(blobStoreManager.get(getBlobIdCaptor.capture(), 
outputStreamCaptor.capture(),
+        any(Metadata.class), any(Boolean.class)))
+        .thenAnswer(invocation -> {
+          ByteArrayOutputStream outputStream = outputStreamCaptor.getValue();
+          outputStream.write(new SnapshotIndexSerde().toBytes(snapshotIndex));
+          return CompletableFuture.completedFuture(null);
+        });
+
+    ArgumentCaptor<String> removeTTLBlobIdCaptor = 
ArgumentCaptor.forClass(String.class);
+    when(blobStoreManager.removeTTL(removeTTLBlobIdCaptor.capture(), 
any(Metadata.class)))
+        .thenAnswer(invocation -> CompletableFuture.completedFuture(null));
+
+    ArgumentCaptor<String> deleteBlobIdCaptor = 
ArgumentCaptor.forClass(String.class);
+    when(blobStoreManager.delete(deleteBlobIdCaptor.capture(), 
any(Metadata.class)))
+        .thenAnswer(invocation -> CompletableFuture.completedFuture(null));
+
+    Map<TaskName, Checkpoint> updatedTaskCheckpoints =
+        
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName,
 factoryToTaskRestoreManager),
+            samzaContainerMetrics, checkpointManager, jobContext, 
containerModel, taskCheckpoints,
+            taskBackendFactoryToStoreNames, config, executor, new HashMap<>(), 
null,
+            ImmutableMap.of("store", systemConsumer)).get();
+
+    // verify taskCheckpoint is updated
+    assertNotEquals(((CheckpointV2) 
taskCheckpoints.get(taskName)).getCheckpointId(),
+        ((CheckpointV2) 
updatedTaskCheckpoints.get(taskName)).getCheckpointId());
+
+    // verify init is not retried with getDeleted
+    verify(taskRestoreManager, times(0)).init(any(Checkpoint.class), 
anyBoolean());
+
+    // verify restore is call twice - once without getDeleted flag, once with 
getDeleted flag
+    verify(taskRestoreManager, times(1)).restore();
+    verify(taskRestoreManager, times(1)).restore(true);
+
+    // verify the GET and removeTTL was called on the new SnapshotIndex
+    assertEquals(expectedBlobId, getBlobIdCaptor.getAllValues().get(0));
+    assertEquals(expectedBlobId, removeTTLBlobIdCaptor.getAllValues().get(0));
+
+    // verify that GET and delete was called on the old SnapshotIndex
+    assertEquals(expectedOldBlobId, getBlobIdCaptor.getAllValues().get(1));
+    assertEquals(expectedOldBlobId, deleteBlobIdCaptor.getValue());
+  }
+
   @Test
   public void getActiveTaskChangelogSystemStreams() {
     Map<String, SystemStream> storeToChangelogSystemStreams =
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
 
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
index 8bb93f769..ad869f0a8 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskBackupManager.java
@@ -72,7 +72,7 @@ public class TestTransactionalStateTaskBackupManager {
     
doReturn(mock(java.util.Map.class)).when(tsm).getNewestChangelogSSPOffsets(any(),
 any(), any(), any());
 
     // invoke Kafka flush
-    commitManager.init();
+    commitManager.init(null);
     commitManager.snapshot(CheckpointId.create());
 
     // ensure that stores are flushed before we get newest changelog offsets
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java 
b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
index 9ab870649..1012646a9 100644
--- 
a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
@@ -175,10 +175,12 @@ public class MyStatefulApplication implements 
TaskApplication {
     public void init(Context context) {
       storeNames.forEach(storeName -> {
         KeyValueStore<String, String> store = (KeyValueStore<String, String>) 
context.getTaskContext().getStore(storeName);
+        LOG.debug("For storename:{} received: {}", storeName, store);
         stores.add(store); // any input messages will be written to all 
'stores'
         KeyValueIterator<String, String> storeEntries = store.all();
         List<String> storeInitialContents = new ArrayList<>();
         while (storeEntries.hasNext()) {
+          LOG.debug("INIT {} Store content. StoreInitialContent: {}", 
storeName, storeInitialContents);
           storeInitialContents.add(storeEntries.next().getValue());
         }
         initialStoreContents.put(storeName, storeInitialContents);
@@ -188,11 +190,14 @@ public class MyStatefulApplication implements 
TaskApplication {
       inMemoryStoreNames.forEach(storeName -> {
         KeyValueStore<String, String> store =
             (KeyValueStore<String, String>) 
context.getTaskContext().getStore(storeName);
+        LOG.debug("For storename:{} received: {}", storeName, store);
         stores.add(store); // any input messages will be written to all 
'stores'.
         KeyValueIterator<String, String> storeEntries = store.all();
         List<String> storeInitialContents = new ArrayList<>();
+        LOG.debug("INIT InMemory Store content:{} ", storeName);
         while (storeEntries.hasNext()) {
           storeInitialContents.add(storeEntries.next().getValue());
+          LOG.debug("INIT InMemory Store content. StoreInitialContent: {}", 
storeInitialContents);
         }
         initialInMemoryStoreContents.put(storeName, storeInitialContents);
         storeEntries.close();
@@ -201,6 +206,7 @@ public class MyStatefulApplication implements 
TaskApplication {
       if (sideInputStoreName.isPresent()) {
         KeyValueStore<String, String> sideInputStore =
             (KeyValueStore<String, String>) 
context.getTaskContext().getStore(sideInputStoreName.get());
+        LOG.debug("For storename:{} received: {}", sideInputStoreName, 
sideInputStore);
         KeyValueIterator<String, String> sideInputStoreEntries = 
sideInputStore.all();
         List<String> sideInputStoreInitialContents = new ArrayList<>();
         while (sideInputStoreEntries.hasNext()) {
@@ -219,25 +225,38 @@ public class MyStatefulApplication implements 
TaskApplication {
 
       if (key.endsWith("crash_once")) {  // endsWith allows :crash_once and 
crash_once
         if (!crashedOnce) {
+          LOG.debug("Process in my task: CrashOnce received.");
           crashedOnce = true;
           coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
         } else {
           return;
         }
       } else if (key.endsWith("shutdown")) {
+        LOG.debug("Process in my task: Shutdown received.");
         coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
       } else if (key.startsWith("-")) {
-        stores.forEach(store -> store.delete(key.substring(1)));
+        stores.forEach(store -> {
+          LOG.debug("Process in my task: - received. Deleting: {} from {}", 
key, store);
+          store.delete(key.substring(1));
+        });
       } else if (key.startsWith(":")) {
         // write the message and flush, but don't invoke commit later
         String msg = key.substring(1);
-        stores.forEach(store -> store.put(msg, msg));
+        stores.forEach(store -> {
+          LOG.debug("Process in my task: ':' received. Put {} in {}", msg, 
store);
+          store.put(msg, msg);
+        });
       } else {
-        stores.forEach(store -> store.put(key, key));
+        stores.forEach(store -> {
+          LOG.debug("Process in my task: Adding key to store. Put {} in {}", 
key, store);
+          store.put(key, key);
+        });
       }
+      LOG.debug("Process in my task: Flush received.");
       stores.forEach(KeyValueStore::flush);
 
       if (!key.startsWith(":")) {
+        LOG.debug("Process in my task: ':' not received. Calling commit: {}", 
TaskCoordinator.RequestScope.CURRENT_TASK);
         coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
       }
     }
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
index 1779ac735..e79d1b86e 100644
--- 
a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -51,15 +51,21 @@ import org.apache.samza.storage.MyStatefulApplication;
 import org.apache.samza.storage.SideInputsProcessor;
 import org.apache.samza.storage.StorageManagerUtil;
 import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.FileBlob;
+import org.apache.samza.storage.blobstore.index.FileIndex;
 import org.apache.samza.storage.blobstore.index.SnapshotIndex;
 import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.test.util.TestBlobStoreManager;
 import org.apache.samza.util.FileUtil;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -124,6 +130,8 @@ public class BlobStoreStateBackendIntegrationTest extends 
BaseStateBackendIntegr
 
   private final boolean hostAffinity;
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreStateBackendIntegrationTest.class);
+
   public BlobStoreStateBackendIntegrationTest(boolean hostAffinity) {
     this.hostAffinity = hostAffinity;
   }
@@ -131,6 +139,7 @@ public class BlobStoreStateBackendIntegrationTest extends 
BaseStateBackendIntegr
   @Before
   @Override
   public void setUp() {
+    LOG.debug("Starting setup");
     super.setUp();
     // reset static state shared with task between each parameterized iteration
     MyStatefulApplication.resetTestState();
@@ -138,7 +147,15 @@ public class BlobStoreStateBackendIntegrationTest extends 
BaseStateBackendIntegr
     fileUtil.rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store 
on startup
     // no need to clear ledger dir since subdir of blob store base dir
     fileUtil.rm(new File(BLOB_STORE_BASE_DIR)); // always clear local "blob 
store" on startup
+  }
 
+  @After
+  @Override
+  public void tearDown() {
+    FileUtil fileUtil = new FileUtil();
+    fileUtil.rm(new File(LOGGED_STORE_BASE_DIR));
+    fileUtil.rm(new File(BLOB_STORE_BASE_DIR));
+    LOG.debug("Tear down complete");
   }
 
   @Test
@@ -197,6 +214,124 @@ public class BlobStoreStateBackendIntegrationTest extends 
BaseStateBackendIntegr
     verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot), 
hostAffinity, true, true);
   }
 
+  @Test
+  public void stopDeleteBlobRun() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", 
"4", "5", "6");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        CONFIGS);
+
+    Pair<String, SnapshotIndex> lastRegularSnapshot =
+        verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity, 
false, false);
+    Pair<String, SnapshotIndex> lastSideInputSnapshot =
+        verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity, 
true,
+            false /* no side input offsets file will be present during initial 
restore */);
+
+    // verifies transactional state too
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", 
"9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3");
+    // verifies that in-memory stores backed by changelogs work correctly
+    // (requires overriding store level state backends explicitly)
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = 
Arrays.asList("1", "2", "3");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new 
ArrayList<>(sideInputMessagesOnInitialRun);
+    
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+
+    deleteBlobFromLastCheckpoint(lastRegularSnapshot.getRight());
+
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+
+    verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot), 
hostAffinity, false, false);
+    verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot), 
hostAffinity, true, true);
+  }
+
+  @Test
+  public void stopDeleteSnapshotIndexBlobRun() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", 
"4", "5", "6");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        CONFIGS);
+
+    Pair<String, SnapshotIndex> lastRegularSnapshot =
+        verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity, 
false, false);
+    Pair<String, SnapshotIndex> lastSideInputSnapshot =
+        verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity, 
true,
+            false /* no side input offsets file will be present during initial 
restore */);
+
+    // verifies transactional state too
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", 
"9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3");
+    // verifies that in-memory stores backed by changelogs work correctly
+    // (requires overriding store level state backends explicitly)
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = 
Arrays.asList("1", "2", "3");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new 
ArrayList<>(sideInputMessagesOnInitialRun);
+    
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+
+    deleteLastSnapshotIndex(lastRegularSnapshot);
+
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+
+    verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot), 
hostAffinity, false, false);
+    verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot), 
hostAffinity, true, true);
+  }
+
   /**
    * Verifies the ledger for TestBlobStoreManager.
    * @param startingSnapshot snapshot file name and files present in snapshot 
at the beginning of verification (from previous run), if any.
@@ -261,6 +396,53 @@ public class BlobStoreStateBackendIntegrationTest extends 
BaseStateBackendIntegr
     }
   }
 
+  private void deleteLastSnapshotIndex(Pair<String, SnapshotIndex> 
pathSnapshotIndexPair) {
+    
deleteCheckpointFromBlobStore(pathSnapshotIndexPair.getRight().getDirIndex());
+    try {
+      LOG.debug("Deleted last SnapshotIndex: {}", 
pathSnapshotIndexPair.getLeft());
+      Files.move(Paths.get(pathSnapshotIndexPair.getLeft()), 
Paths.get(pathSnapshotIndexPair.getLeft() + "-DELETED")); // Delete the file
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to delete file: " + 
pathSnapshotIndexPair.getLeft(), e);
+    }
+  }
+
+  private void deleteBlobFromLastCheckpoint(SnapshotIndex snapshotIndex) {
+    LOG.debug("Deleting SnapshotIndex {} and all associated blobs", 
snapshotIndex);
+    DirIndex dirIndex = snapshotIndex.getDirIndex();
+    // deleted any blob - deleting first blob in the snapshot
+    List<FileIndex> files = dirIndex.getFilesPresent();
+    for (FileIndex file: files) {
+      FileBlob blob = file.getBlobs().get(0);
+      try {
+        Files.move(Paths.get(blob.getBlobId()), Paths.get(blob.getBlobId() + 
"-DELETED")); // Delete the file
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to delete file: " + 
blob.getBlobId(), e);
+      }
+    }
+  }
+
+  /**
+   * Helper function to recursively delete all files from local blob store
+   * @param dirIndex
+   */
+  private void deleteCheckpointFromBlobStore(DirIndex dirIndex) {
+    for (FileIndex file: dirIndex.getFilesPresent()) {
+      List<FileBlob> blobs = file.getBlobs();
+      for (FileBlob blob: blobs) {
+        String blobPath = blob.getBlobId();
+        try {
+          LOG.debug("Deleting blob: {}", blobPath);
+          Files.move(Paths.get(blobPath), Paths.get(blobPath + "-DELETED")); 
// Delete the file
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to delete file: " + blobPath, e);
+        }
+      }
+    }
+    for (DirIndex subdir: dirIndex.getSubDirsPresent()) {
+      deleteCheckpointFromBlobStore(subdir);
+    }
+  }
+
   static class MySideInputProcessor implements SideInputsProcessor, 
Serializable {
     @Override
     public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, 
KeyValueStore store) {
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java 
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
index d247c10e3..76441f388 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.concurrent.CompletableFuture;
@@ -35,6 +36,7 @@ import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.config.Config;
 import org.apache.samza.storage.blobstore.BlobStoreManager;
 import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.exceptions.DeletedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +49,7 @@ public class TestBlobStoreManager implements BlobStoreManager 
{
   public static final String LEDGER_FILES_DELETED = "filesRemoved";
   public static final String LEDGER_FILES_TTL_UPDATED = "filesTTLUpdated";
 
+  private final String deletedTombstone = "-DELETED";
   private final Path stateLocation;
   private final File filesAddedLedger;
   private final File filesReadLedger;
@@ -97,24 +100,45 @@ public class TestBlobStoreManager implements 
BlobStoreManager {
         metadata.getTaskName(), metadata.getStoreName(), suffix);
     LOG.info("Creating file at {}", destination);
     try {
-      FileUtils.writeStringToFile(filesAddedLedger, destination + "\n", 
Charset.defaultCharset(), true);
       FileUtils.copyInputStreamToFile(inputStream, destination.toFile());
+      FileUtils.writeStringToFile(filesAddedLedger, destination + "\n", 
Charset.defaultCharset(), true);
     } catch (IOException e) {
+      LOG.error("Error creating file: " + destination);
       throw new RuntimeException("Error creating file " + destination, e);
     }
     return CompletableFuture.completedFuture(destination.toString());
   }
 
   @Override
-  public CompletionStage<Void> get(String id, OutputStream outputStream, 
Metadata metadata) {
+  public CompletionStage<Void> get(String id, OutputStream outputStream, 
Metadata metadata, boolean getDeletedBlob) {
     LOG.info("Reading file at {}", id);
+    Path filePath = Paths.get(id);
     try {
-      FileUtils.writeStringToFile(filesReadLedger, id + "\n", 
Charset.defaultCharset(), true);
-      Path path = Paths.get(id);
-      Files.copy(path, outputStream);
+      Files.copy(filePath, outputStream);
       outputStream.flush();
+      FileUtils.writeStringToFile(filesReadLedger, id + "\n", 
Charset.defaultCharset(), true);
+    } catch (NoSuchFileException noSuchFileException) {
+      // Blob marked for deletion is suffixed with 'DELETED-'. Retrieve 
deleted blob if getDeletedBlob is True.
+      Path deletedFilePath = Paths.get(id + deletedTombstone);
+      String msg = String.format("File id: %s was not found. GetDeletedBlob 
was set to false", id);
+      if (!Files.exists(deletedFilePath)) {
+        throw new RuntimeException("404: " + msg, noSuchFileException);
+      }
+      if (getDeletedBlob) {
+        try {
+          Files.copy(deletedFilePath, outputStream);
+          outputStream.flush();
+          FileUtils.writeStringToFile(filesReadLedger, id + "\n", 
Charset.defaultCharset(), true);
+          LOG.info("Deleted File id {} found - Get deleted was set to true", 
id);
+        } catch (IOException e) {
+          throw new RuntimeException("Error reading file with GetDeleted set 
to true. File: " + id, e);
+        }
+      } else {
+        LOG.info("File id {} is deleted - Get deleted was set to false", id);
+        throw new DeletedException("410: " + msg, noSuchFileException);
+      }
     } catch (IOException e) {
-      throw new RuntimeException("Error reading file for id " + id, e);
+      throw new RuntimeException("Error reading file for id " + id + ". Get 
deleted was set to " + getDeletedBlob, e);
     }
     return CompletableFuture.completedFuture(null);
   }
@@ -124,7 +148,10 @@ public class TestBlobStoreManager implements 
BlobStoreManager {
     LOG.info("Deleting file at {}", id);
     try {
       FileUtils.writeStringToFile(filesDeletedLedger, id + "\n", 
Charset.defaultCharset(), true);
-      Files.delete(Paths.get(id));
+      // Suffix with 'DELETED' do not actually delete.
+      Files.move(Paths.get(id), Paths.get(id + deletedTombstone));
+    } catch (NoSuchFileException ex) {
+      LOG.warn("File might already be deleted. id: " + id);
     } catch (IOException e) {
       throw new RuntimeException("Error deleting file for id " + id, e);
     }


Reply via email to