prateekm commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1288871486
##########
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java:
##########
@@ -93,18 +93,16 @@ public TaskStorageCommitManager(TaskName taskName,
Map<String, TaskBackupManager
this.metrics = metrics;
}
- public void init() {
+ public void init(Checkpoint checkpoint) {
// Assuming that container storage manager has already started and created
to stores
storageEngines = containerStorageManager.getAllStores(taskName);
- if (checkpointManager != null) {
- Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
- LOG.debug("Last checkpoint on start for task: {} is: {}", taskName,
checkpoint);
- stateBackendToBackupManager.values()
- .forEach(storageBackupManager ->
storageBackupManager.init(checkpoint));
- } else {
- stateBackendToBackupManager.values()
- .forEach(storageBackupManager -> storageBackupManager.init(null));
+ final Checkpoint[] finalCheckpoint = new Checkpoint[]{checkpoint};
Review Comment:
Minor: Prefer just naming this latestCheckpoint instead of doing this.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -264,7 +303,7 @@ public CompletableFuture<Void> restoreDir(File baseDir,
DirIndex dirIndex, Metad
List<DirIndex> subDirs = dirIndex.getSubDirsPresent();
for (DirIndex subDir : subDirs) {
File subDirFile = Paths.get(baseDir.getAbsolutePath(),
subDir.getDirName()).toFile();
- downloadFutures.add(restoreDir(subDirFile, subDir, metadata));
+ downloadFutures.add(restoreDir(subDirFile, subDir, metadata, false));
Review Comment:
Blocker: This should use the boolean flag too.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn(
+ "Received DeletedException during restore for task {}.
Reattempting to get deleted blobs",
Review Comment:
Minor: s/Reattempting .../Attempting to restore again with getDeletedBlobs
set to true.
Minor: use same naming convention for the getDeletedBlob flag in the entire
PR (comments, params, logs) for grep-ability. (I see get deleted blobs,
GetDeletedBlob, getDeletedBlob, getDeleted etc.)
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
Review Comment:
Minor: Suggest moving all of these to a new Util class, e.g.
ContainerStorageManagerRestoreUtil
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java:
##########
@@ -139,8 +139,10 @@ public void init(Checkpoint checkpoint) {
// Note: blocks the caller thread.
// TODO LOW shesharma exclude stores that are no longer configured during
init
+ // NOTE: Get SnapshotIndex with getDeleted set to false. A failure to get
a blob from SnapshotIndex would restart
+ // the container and the init()/restore() should be able to create a new
Snapshot in the blob store and recover.
Review Comment:
Minor: Mention that this helps with rare race conditions where the snapshot
index blob got deleted after the restore compledted successfully.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java:
##########
@@ -254,7 +269,10 @@ static CompletableFuture<Void> restoreStores(String
jobName, String jobId, TaskN
throw new SamzaException(String.format("Error deleting store
directory: %s", storeDir), e);
}
- boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName,
dirIndex,
+ // If getDeletedBlobs is enabled - always restore so that we get all the
blobs, including the deleted blobs,
Review Comment:
Minor: Run on sentence. Rephrase for clarity.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -210,6 +203,51 @@ public CompletableFuture<String>
putSnapshotIndex(SnapshotIndex snapshotIndex) {
}, isCauseNonRetriable(), executor, retryPolicyConfig);
}
+ /**
+ * Gets SnapshotIndex blob, cleans up a SnapshotIndex by recursively
deleting all blobs associated with files/subdirs
+ * inside the SnapshotIndex and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob id of SnapshotIndex
+ * @param requestMetadata Metadata of the request
+ * @param getDeletedBlob Determines whether to try to get deleted
SnapshotIndex or not.
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
Metadata requestMetadata, boolean getDeletedBlob) {
+ Metadata getSnapshotRequest = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
requestMetadata.getJobName(),
+ requestMetadata.getJobId(), requestMetadata.getTaskName(),
requestMetadata.getStoreName());
+ return getSnapshotIndex(snapshotIndexBlobId, getSnapshotRequest,
getDeletedBlob)
+ .thenCompose(snapshotIndex -> cleanSnapshotIndex(snapshotIndexBlobId,
snapshotIndex, requestMetadata));
+ }
+
+ /**
+ * Cleans up a SnapshotIndex by recursively deleting all blobs associated
with files/subdirs inside the SnapshotIndex
+ * and finally deletes SnapshotIndex blob itself.
+ * @param snapshotIndexBlobId Blob if of SnapshotIndex
+ * @param snapshotIndex SnapshotIndex to delete
+ * @param requestMetadata Metadata of the request
+ */
+ public CompletionStage<Void> cleanSnapshotIndex(String snapshotIndexBlobId,
SnapshotIndex snapshotIndex, Metadata requestMetadata) {
+ DirIndex dirIndex = snapshotIndex.getDirIndex();
+ CompletionStage<Void> storeDeletionFuture =
+ cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs
previously marked for removal
+ .thenComposeAsync(v ->
+ deleteDir(dirIndex, requestMetadata), executor) // deleted
files and dirs still present
+ .thenComposeAsync(v ->
deleteSnapshotIndexBlob(snapshotIndexBlobId, requestMetadata), executor) //
delete the snapshot index blob
+ .exceptionally(ex -> {
+ Throwable unwrappedException =
FutureUtil.unwrapExceptions(CompletionException.class,
+ FutureUtil.unwrapExceptions(SamzaException.class, ex));
+ // If SnapshotIndex is already deleted, do not fail -> this may
happen if after we restore a
Review Comment:
Minor: s/SnapshotIndex/blob, since the error may occur for deleted files as
well.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -229,9 +267,10 @@ public CompletionStage<Void>
deleteSnapshotIndexBlob(String snapshotIndexBlobId,
/**
* Non-blocking restore of a {@link SnapshotIndex} to local store by
downloading all the files and sub-dirs associated
* with this remote snapshot.
+ * NOTE: getDeletedFiles flag sets if it reattempts to get a deleted file by
setting getDeleted flag in getFiles.
* @return A future that completes when all the async downloads completes
*/
- public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex,
Metadata metadata) {
+ public CompletableFuture<Void> restoreDir(File baseDir, DirIndex dirIndex,
Metadata metadata, boolean getDeletedFiles) {
LOG.debug("Restoring contents of directory: {} from remote snapshot.",
baseDir);
Review Comment:
Minor: Log boolean flag.
##########
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala:
##########
@@ -993,10 +1000,21 @@ class SamzaContainer(
})
}
- def startTask {
+ /**
+ * Init all task instances
+ * @param taskCheckpoints last checkpoint for a TaskName. This last
checkpoint could be different from the one returned
+ * from CommitManager#getLastCheckpoint. The new
checkpoint could be created in case the last
Review Comment:
s/CommitManager/CheckpointManager or OffsetManager?
##########
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala:
##########
@@ -149,31 +149,40 @@ class TaskInstance(
}
}
- def initTask {
+ def initTask(lastTaskCheckpoint: Option[Checkpoint]) {
initCaughtUpMapping()
+ val isStandByTask = taskModel.getTaskMode == TaskMode.Standby
+ var checkpoint: Checkpoint = lastTaskCheckpoint.orNull
+
if (commitManager != null) {
debug("Starting commit manager for taskName: %s" format taskName)
-
- commitManager.init()
+ if (isStandByTask) {
+ debug("Standby task: %s Passing null to init for taskName %s:" format
(isStandByTask, taskName))
Review Comment:
Minor: taskName: %s (typo). Ideally remove colons for consistency with next
message.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java:
##########
@@ -327,7 +329,12 @@ public CompletableFuture<Void> cleanUp(CheckpointId
checkpointId, Map<String, St
});
return FutureUtil.allOf(removeTTLFutures, cleanupRemoteSnapshotFutures,
removePrevRemoteSnapshotFutures)
- .whenComplete((res, ex) -> metrics.cleanupNs.update(System.nanoTime()
- startTime));
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Could not finish cleanup. Checkpoint id: {}, store
SCMs: {}", checkpointId, storeSCMs, ex);
Review Comment:
Minor: "checkpointId" for consistency with other logs and grep-ability.
##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -424,7 +464,7 @@ CompletableFuture<Void> getFile(List<FileBlob> fileBlobs,
File fileToRestore, Me
resultFuture = resultFuture.thenComposeAsync(v -> {
LOG.debug("Starting restore for file: {} with blob id: {} at offset:
{}", fileToRestore, fileBlob.getBlobId(),
Review Comment:
Minor: Log boolean flag too.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -301,79 +303,28 @@ private void restoreStores() throws InterruptedException {
samzaContainerMetrics, taskInstanceMetrics,
taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config,
clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+ taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});
- // Initialize each TaskStorageManager
- taskRestoreManagers.forEach((taskName, restoreManagers) ->
- restoreManagers.forEach((factoryName, taskRestoreManager) ->
- taskRestoreManager.init(taskCheckpoints.get(taskName))
- )
- );
-
- // Start each store consumer once.
- // Note: These consumers are per system and only changelog system store
consumers will be started.
- // Some TaskRestoreManagers may not require the consumer to to be started,
but due to the agnostic nature of
- // ContainerStorageManager we always start the changelog consumer here in
case it is required
-
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
- List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
- // Submit restore callable for each taskInstance
- taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
- // Submit for each restore factory
- restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
- long startTime = System.currentTimeMillis();
- String taskName = taskInstance.getTaskName();
- LOG.info("Starting restore for state for task: {}", taskName);
- CompletableFuture<Void> restoreFuture =
taskRestoreManager.restore().handle((res, ex) -> {
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore",
- taskName, ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be be able to
continue processing/backups
- // if restore manager close fails.
- }
-
- long timeToRestore = System.currentTimeMillis() - startTime;
- if (samzaContainerMetrics != null) {
- Gauge taskGauge =
samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance,
null);
-
- if (taskGauge != null) {
- taskGauge.set(timeToRestore);
- }
- }
-
- if (ex != null) {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked exception
to throw from lambda
- } else {
- return null;
- }
- });
-
- taskRestoreFutures.add(restoreFuture);
- });
- });
+ // Init all taskRestores and if successful, restores all the task stores
concurrently
+ CompletableFuture<Map<TaskName, Checkpoint>>
initRestoreAndNewCheckpointFuture =
+
ContainerStorageManagerUtil.initAndRestoreTaskInstances(taskRestoreManagers,
samzaContainerMetrics,
+ checkpointManager, jobContext, containerModel, taskCheckpoints,
taskBackendFactoryToStoreNames, config,
+ restoreExecutor, taskInstanceMetrics, loggedStoreBaseDirectory,
storeConsumers);
- // Loop-over the future list to wait for each restore to finish, catch any
exceptions during restore and throw
- // as samza exceptions
- for (Future<Void> future : taskRestoreFutures) {
- try {
- future.get();
- } catch (InterruptedException e) {
- LOG.warn("Received an interrupt during store restoration. Interrupting
the restore executor to exit "
- + "prematurely without restoring full state.");
- restoreExecutor.shutdownNow();
- throw e;
- } catch (Exception e) {
- LOG.error("Exception when restoring state.", e);
- throw new SamzaException("Exception when restoring state.", e);
- }
+ // Update the task checkpoints map, if it was updated during the restore.
Throw an exception if the restore or
+ // creating a new checkpoint (in case of BlobStoreBackendFactory) failed.
+ try {
+ Map<TaskName, Checkpoint> newTaskCheckpoints =
initRestoreAndNewCheckpointFuture.get();
+ taskCheckpoints.putAll(newTaskCheckpoints);
Review Comment:
Minor: Can return the newTaskCheckpoints map directly instead of copying
over.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn(
+ "Received DeletedException during restore for task {}.
Reattempting to get deleted blobs",
+ taskInstanceName.getTaskName());
+
+ // Try to restore with getDeleted flag
+ CompletableFuture<Checkpoint> future =
+ restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
+ checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
+
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
+ jobContext, containerModel);
+ try {
+ if (future != null) {
Review Comment:
Minor: Can this be null? If not, simplify.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn(
+ "Received DeletedException during restore for task {}.
Reattempting to get deleted blobs",
+ taskInstanceName.getTaskName());
+
+ // Try to restore with getDeleted flag
+ CompletableFuture<Checkpoint> future =
+ restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
+ checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
+
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
+ jobContext, containerModel);
+ try {
+ if (future != null) {
+ newTaskCheckpoints.put(taskInstanceName, future);
+ }
+ } catch (Exception e) {
+ String msg =
Review Comment:
Minor: This should say "Error restoring deleted snapshot with
getDeletedBlobs set to true" or something. We don't know it's a DeletedException
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn(
+ "Received DeletedException during restore for task {}.
Reattempting to get deleted blobs",
+ taskInstanceName.getTaskName());
+
+ // Try to restore with getDeleted flag
+ CompletableFuture<Checkpoint> future =
+ restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
+ checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
+
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
+ jobContext, containerModel);
+ try {
+ if (future != null) {
+ newTaskCheckpoints.put(taskInstanceName, future);
+ }
+ } catch (Exception e) {
Review Comment:
Minor: Should the restoreDeletedSnapshot method call be in try-catch too to
capture any synchronous exceptions?
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn(
+ "Received DeletedException during restore for task {}.
Reattempting to get deleted blobs",
+ taskInstanceName.getTaskName());
+
+ // Try to restore with getDeleted flag
+ CompletableFuture<Checkpoint> future =
+ restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
+ checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
+
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
+ jobContext, containerModel);
+ try {
+ if (future != null) {
+ newTaskCheckpoints.put(taskInstanceName, future);
+ }
+ } catch (Exception e) {
+ String msg =
+ String.format("DeletedException during restore task: %s
after retrying to get deleted blobs.", taskName);
+ throw new SamzaException(msg, e);
+ } finally {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+ }
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("Error restoring state for task: %s",
taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
+ }
+ }
+
+ // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
+ // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
+ try {
+ taskRestoreManager.close();
+ } catch (Exception e) {
+ LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
+ ex != null ? "unsuccessful" : "successful", e);
+ // ignore exception from close. container may still be able to
continue processing/backups
+ // if restore manager close fails.
+ }
+ return null;
+ });
+ taskRestoreFutures.add(taskRestoreFuture);
+ });
+ });
+ CompletableFuture<Void> restoreFutures =
CompletableFuture.allOf(taskRestoreFutures.toArray(new CompletableFuture[0]));
+ return restoreFutures.thenCompose(ignoredVoid ->
FutureUtil.toFutureOfMap(newTaskCheckpoints));
+ }
+
+ /**
+ * Returns a single future that guarantees all the following are completed,
in this order:
+ * 1. Restore state locally by getting deleted blobs from the blob store.
+ * 2. Create a new snapshot from restored state by backing it up on the
blob store.
+ * 3. Remove TTL from the new Snapshot and all the associated blobs in the
blob store
+ * 4. Clean up old/deleted Snapshot
+ * 5. Create and write the new checkpoint to checkpoint topic
+ */
+ private static CompletableFuture<Checkpoint> restoreDeletedSnapshot(TaskName
taskName,
+ Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager
checkpointManager,
+ TaskRestoreManager taskRestoreManager, Config config, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ ExecutorService executor, Set<String> storesToRestore, File
loggedStoreBaseDirectory, JobContext jobContext,
+ ContainerModel containerModel) {
+
+ // if taskInstanceMetrics are specified use those for store metrics,
+ // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
+ MetricsRegistry metricsRegistry =
+ taskInstanceMetrics.get(taskName) != null ?
taskInstanceMetrics.get(taskName).registry()
+ : new MetricsRegistryMap();
+
+ BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
+ JobConfig jobConfig = new JobConfig(config);
+ BlobStoreUtil blobStoreUtil =
+ new BlobStoreUtil(blobStoreManager, executor, new
BlobStoreConfig(config), null,
+ new BlobStoreRestoreManagerMetrics(metricsRegistry));
+
+ BlobStoreRestoreManager blobStoreRestoreManager =
(BlobStoreRestoreManager) taskRestoreManager;
+
+ CheckpointId checkpointId = CheckpointId.create();
+ Map<String, String> oldSCMs = ((CheckpointV2)
taskCheckpoints.get(taskName)).getStateCheckpointMarkers()
+ .get(BlobStoreStateBackendFactory.class.getName());
+
+ // 1. Restore state with getDeleted flag set to true
+ CompletableFuture<Void> restoreFuture =
blobStoreRestoreManager.restore(true);
+
+ // 2. Create a new checkpoint and back it up on the blob store
+ CompletableFuture<Map<String, String>> backupStoresFuture =
restoreFuture.thenCompose(
+ r -> backupRecoveredStore(jobContext, containerModel, config, taskName,
storesToRestore, checkpointId,
+ loggedStoreBaseDirectory, blobStoreManager, metricsRegistry,
executor));
+
+ // 3. Mark new Snapshots to never expire
+ CompletableFuture<Void> removeNewSnapshotsTTLFuture =
backupStoresFuture.thenCompose(
+ storeSCMs -> {
+ List<CompletableFuture<Void>> removeSnapshotTTLFutures = new
ArrayList<>();
+ storeSCMs.forEach((store, scm) -> {
+ Metadata requestMetadata = new
Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
+ jobConfig.getName().get(), jobConfig.getJobId(),
taskName.getTaskName(), store);
+
removeSnapshotTTLFutures.add(blobStoreUtil.getSnapshotIndexAndRemoveTTL(scm,
requestMetadata));
Review Comment:
Minor: Should rename util method to "removeTTLForSnapshotIndex" (use same
naming convention as regular cleanup co clarify whether "for SnapshotIndex"
includes its files or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]