prateekm commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1287527272


##########
samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java:
##########
@@ -49,11 +49,12 @@ public interface BlobStoreManager {
    * @param id Blob ID of the blob to get
    * @param outputStream OutputStream to write the downloaded blob
    * @param metadata User supplied {@link Metadata} of the request
+   * @param getDeletedBlob Flag to indicate if get should try to get a blob 
marked for deletion but not yet compacted
    * @return A future that completes when all the chunks are downloaded and 
written successfully to the OutputStream
    * @throws org.apache.samza.storage.blobstore.exceptions.DeletedException 
returned future should complete
    *         exceptionally with DeletedException on failure with the blob 
already deleted error.
    */
-  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata 
metadata);
+  CompletionStage<Void> get(String id, OutputStream outputStream, Metadata 
metadata, Boolean getDeletedBlob);

Review Comment:
   Minor: boolean



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel, 
ExecutorService restoreExecu
 
   @Override
   public void init(Checkpoint checkpoint) {
+    init(checkpoint, false);
+  }
+
+  public void init(Checkpoint checkpoint, Boolean getDeletedBlob) {

Review Comment:
   Minor: boolean everywhere (vs Boolean).



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java:
##########
@@ -139,8 +139,10 @@ public void init(Checkpoint checkpoint) {
 
     // Note: blocks the caller thread.
     // TODO LOW shesharma exclude stores that are no longer configured during 
init
+    // NOTE: Always get SnapshotIndex with getDeleted enabled. A failure to 
get a blob from SnapshotIndex would restart

Review Comment:
   Minor: Comment isn't very clear. Doesn't explain _why_ this should be true. 
Only (if I understand correctly) what happens when this is true and there is 
still a failure.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -212,9 +213,17 @@ static CompletableFuture<Void> restoreStores(String 
jobName, String jobId, TaskN
       File loggedBaseDir, StorageConfig storageConfig, 
BlobStoreRestoreManagerMetrics metrics,
       StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, 
DirDiffUtil dirDiffUtil,
       ExecutorService executor) {
+    return restoreStores(jobName, jobId, taskName, storesToRestore, 
prevStoreSnapshotIndexes, loggedBaseDir, storageConfig,
+        metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, 
false);
+  }
+
+  public static CompletableFuture<Void> restoreStores(String jobName, String 
jobId, TaskName taskName, Set<String> storesToRestore,

Review Comment:
   Minor: Why public? Make package private and @VisibleForTesting?



##########
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java:
##########
@@ -93,14 +93,18 @@ public TaskStorageCommitManager(TaskName taskName, 
Map<String, TaskBackupManager
     this.metrics = metrics;
   }
 
-  public void init() {
+  public void init(Checkpoint checkpoint) {
     // Assuming that container storage manager has already started and created 
to stores
     storageEngines = containerStorageManager.getAllStores(taskName);
     if (checkpointManager != null) {
-      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      // In case of standby containers

Review Comment:
   Minor: Explain inline why this is the case for standby containers.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,8 +263,10 @@ static CompletableFuture<Void> restoreStores(String 
jobName, String jobId, TaskN
         throw new SamzaException(String.format("Error deleting store 
directory: %s", storeDir), e);
       }
 
+      // If getDeletedBlobs is enabled - always restore so that we get all the 
blobs, including the deleted blobs,
+      // immediately restore it locally and backup to create new checkpoint.

Review Comment:
   Minor: "backup to create .." is not happening here, clarify comment.



##########
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java:
##########
@@ -93,14 +93,18 @@ public TaskStorageCommitManager(TaskName taskName, 
Map<String, TaskBackupManager
     this.metrics = metrics;
   }
 
-  public void init() {
+  public void init(Checkpoint checkpoint) {
     // Assuming that container storage manager has already started and created 
to stores
     storageEngines = containerStorageManager.getAllStores(taskName);
     if (checkpointManager != null) {
-      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      // In case of standby containers
+      if (checkpoint == null) {
+        checkpoint = checkpointManager.readLastCheckpoint(taskName);

Review Comment:
   Minor: prefer not reassigning input parameter. Create finalCheckpoint (maybe 
rename to latestCheckpoint) earlier and assign/reassign to it.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId, 
getSnapshotRequest, getDeletedBlob).join();
+    return cleanSnapshotIndex(snapshotIndexBlobId, snapshotIndex, 
requestMetadata);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param snapshotIndex SnapshotIndex to delete
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+    DirIndex dirIndex = snapshotIndex.getDirIndex();
+    CompletionStage<Void> storeDeletionFuture =
+        cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs 
previously marked for removal
+            .thenComposeAsync(v ->
+                deleteDir(dirIndex, requestMetadata), executor) // deleted 
files and dirs still present
+            .thenComposeAsync(v -> 
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) // 
delete the snapshot index blob
+            .exceptionally(ex -> {
+              if (ex instanceof DeletedException) {

Review Comment:
   Doesn't look like it's fixed.



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {
     initCaughtUpMapping()
 
+    val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+
     if (commitManager != null) {
       debug("Starting commit manager for taskName: %s" format taskName)
-
-      commitManager.init()
+      commitManager.init(if (isStandByTask) null else lastTaskCheckpoint)

Review Comment:
   Doesn't seem to be fixed. Missing the latest commit?



##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -981,7 +981,7 @@ class SamzaContainer(
     }
   }
 
-  def startStores {
+  def startStores: util.Map[TaskName, Checkpoint] = {

Review Comment:
   Doesn't seem to be fixed.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -114,6 +114,11 @@ public BlobStoreUtil(BlobStoreManager blobStoreManager, 
ExecutorService executor
    */
   public Map<String, Pair<String, SnapshotIndex>> getStoreSnapshotIndexes(
       String jobName, String jobId, String taskName, Checkpoint checkpoint, 
Set<String> storesToBackupOrRestore) {
+    return getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, 
storesToBackupOrRestore, false);
+  }
+
+  public Map<String, Pair<String, SnapshotIndex>> 
getStoreSnapshotIndexes(String jobName, String jobId, String taskName,

Review Comment:
   See comments in BSRM and BSBM init. When do we need to get snapshot indexes 
with getDeletedBlobs = false?



##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,33 @@ class TaskInstance(
     }
   }
 
-  def initTask {
+  def initTask(lastTaskCheckpoint: Checkpoint) {

Review Comment:
   Doesn't seem to be fixed.



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
     }
     return sideInputStores;
   }
+
+  public static List<Future<Void>> initAndRestoreTaskInstances(
+      Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers, 
SamzaContainerMetrics samzaContainerMetrics,
+      CheckpointManager checkpointManager, JobContext jobContext, 
ContainerModel containerModel,
+      Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String, 
Set<String>>> taskBackendFactoryToStoreNames,
+      Config config, ExecutorService executor, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+    Set<String> forceRestoreTasks = new HashSet<>();
+    // Initialize each TaskStorageManager.
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+          try {
+            taskRestoreManager.init(taskCheckpoints.get(taskName));
+          } catch (SamzaException ex) {
+            if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager 
instanceof BlobStoreRestoreManager) {
+              // Get deleted SnapshotIndex blob with GetDeleted and mark the 
task to be restored with GetDeleted as well.
+              // this ensures that the restore downloads the snapshot, 
recreates a new snapshot, uploads it to blob store
+              // and creates a new checkpoint.
+              ((BlobStoreRestoreManager) 
taskRestoreManager).init(taskCheckpoints.get(taskName), true);

Review Comment:
   See comments above in BSRM and BSBM init. We may not need to call init twice 
(or with getDeleted) at all.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +216,56 @@ public CompletableFuture<String> 
putSnapshotIndex(SnapshotIndex snapshotIndex) {
     }, isCauseNonRetriable(), executor, retryPolicyConfig);
   }
 
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata) {
+    return cleanSnapshotIndex(snapshotIndexBlobId, requestMetadata, false);
+  }
+
+  /**
+   * Cleans up a SnapshotIndex by recursively deleting all blobs associated 
with files/subdirs inside the SnapshotIndex
+   * and finally deletes SnapshotIndex blob itself. This is done by getting 
the SnapshotIndex first.
+   * @param snapshotIndexBlobId Blob if of SnapshotIndex
+   * @param requestMetadata Metadata of the request
+   * @param getDeletedBlob Gets SnapshotIndex with getDeleted flag set
+   */
+  public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId, 
Metadata requestMetadata, Boolean getDeletedBlob) {
+    Metadata getSnapshotRequest = new 
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), 
requestMetadata.getJobName(),
+        requestMetadata.getJobId(), requestMetadata.getTaskName(), 
requestMetadata.getStoreName());
+    SnapshotIndex snapshotIndex = getSnapshotIndex(snapshotIndexBlobId, 
getSnapshotRequest, getDeletedBlob).join();

Review Comment:
   Why join instead of chain the next call?



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -232,6 +288,15 @@ public CompletionStage<Void> 
deleteSnapshotIndexBlob(String snapshotIndexBlobId,
    * @return A future that completes when all the async downloads completes
    */
   public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex, 
Metadata metadata) {
+    return restoreDir(baseDir, dirIndex, metadata, false);
+  }
+
+  /**
+   * Non-blocking restore of a {@link SnapshotIndex} to local store by 
downloading all the files and sub-dirs associated
+   * with this remote snapshot. getDeletedFiles flag sets whether to attempt a 
get for deletedFiles or not.
+   * @return A future that completes when all the async downloads completes
+   */
+  public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex, 
Metadata metadata, Boolean getDeletedFiles) {

Review Comment:
   Don't need 2 variants for the util method.



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -119,13 +119,18 @@ public BlobStoreRestoreManager(TaskModel taskModel, 
ExecutorService restoreExecu
 
   @Override
   public void init(Checkpoint checkpoint) {
+    init(checkpoint, false);
+  }
+
+  public void init(Checkpoint checkpoint, Boolean getDeletedBlob) {
     long startTime = System.nanoTime();
     LOG.debug("Initializing blob store restore manager for task: {}", 
taskName);
 
     blobStoreManager.init();
 
     // get previous SCMs from checkpoint
-    prevStoreSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes(jobName, 
jobId, taskName, checkpoint, storesToRestore);
+    prevStoreSnapshotIndexes =
+        blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, 
checkpoint, storesToRestore, getDeletedBlob);

Review Comment:
   Why not always get snapshot index with getDeletedBlob = true in init (and 
remove this variant)?
   
   IIUC (correct me if wrong):
   1. We only getStoreSnapshotIndex twice during container startup, once in 
BSRestoreManager, once in BSBackupManager.
   2. Either both requests with getDeleted will succeed, or not.
           a. If both succeed, then regardless of whether the snapshot index 
blob is deleted or not, restore and future backups will continue uninterrupted. 
If the restore for an actual file in the snapshot fails, then BSRM will retry 
the entire restore() with getDeletedBlob = true with the same snapshot index.
           b. If either fails, then the snapshot index blob is gone for good 
and there is nothing we can do.
           
    So why do we need the let the first BSRM init request fail for deleted 
snapshot index blob?
           



##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -139,6 +139,17 @@ public static <K, V> CompletableFuture<Map<K, V>> 
toFutureOfMap(
     });
   }
 
+  /**
+   * Helper method to convert: {@code Map<K, CompletionStage<Void>>}
+   * to:                       {@code CompletableFuture<Void>}
+   *
+   * Returns a future that completes when all value futures complete.
+   * Returned future completes exceptionally if any of the value futures 
complete exceptionally.
+   */
+  public static CompletableFuture<Void> mapToFuture(Map<String, 
CompletionStage<Void>> map) {

Review Comment:
   Can we not just ignore the return value in the previous method?



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,8 +263,10 @@ static CompletableFuture<Void> restoreStores(String 
jobName, String jobId, TaskN
         throw new SamzaException(String.format("Error deleting store 
directory: %s", storeDir), e);
       }
 
+      // If getDeletedBlobs is enabled - always restore so that we get all the 
blobs, including the deleted blobs,
+      // immediately restore it locally and backup to create new checkpoint.
       boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, 
dirIndex,
-          storeCheckpointDir, storageConfig, dirDiffUtil);
+          storeCheckpointDir, storageConfig, dirDiffUtil) || getDeletedBlobs;

Review Comment:
   If shouldRestore returned false during first restore, there would have been 
no restore, no failures, and we would not be restoring with getDeleted = true. 
   
   E.g. if the last checkpoint directory was already present and was identical 
to remote snapshot, it would be copied over to the store directory and restore 
would be skipped. 
   
   It seems like this would be an issue, since the files could actually be 
deleted in remote store, but restore manager wouldn't realize it, and backup 
manager would only upload the delta based on the latest snapshot index. So the 
files could eventually get compacted.
   
   So do we need to remove the shouldRestore check and always force a restore 
upon startup to detect correctly if we need to reupload?
   
   Can sync up offline if I'm misunderstanding something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to