shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1289227259
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java:
##########
@@ -411,4 +438,292 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}
+
+ /**
+ * Inits and Restores all the task stores.
+ * Note: In case of {@link BlobStoreRestoreManager}, this method retries
init and restore with getDeleted flag if it
+ * receives a {@link DeletedException}. This will create a new checkpoint
for the corresponding task.
+ */
+ public static CompletableFuture<Map<TaskName, Checkpoint>>
initAndRestoreTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
SamzaContainerMetrics samzaContainerMetrics,
+ CheckpointManager checkpointManager, JobContext jobContext,
ContainerModel containerModel,
+ Map<TaskName, Checkpoint> taskCheckpoints, Map<TaskName, Map<String,
Set<String>>> taskBackendFactoryToStoreNames,
+ Config config, ExecutorService executor, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ File loggerStoreDir, Map<String, SystemConsumer> storeConsumers) {
+
+ Set<String> forceRestoreTasks = new HashSet<>();
+ // Initialize each TaskStorageManager.
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) -> {
+ try {
+ taskRestoreManager.init(taskCheckpoints.get(taskName));
+ } catch (SamzaException ex) {
+ if (isUnwrappedExceptionDeletedException(ex) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // Get deleted SnapshotIndex blob with GetDeleted and mark the
task to be restored with GetDeleted as well.
+ // this ensures that the restore downloads the snapshot,
recreates a new snapshot, uploads it to blob store
+ // and creates a new checkpoint.
+ ((BlobStoreRestoreManager)
taskRestoreManager).init(taskCheckpoints.get(taskName), true);
+ forceRestoreTasks.add(taskName.getTaskName());
+ } else {
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("init failed for
BlobStoreRestoreManager, task: %s with GetDeleted set to true", taskName);
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex);
+ }
+ }
+ })
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store
consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to be started,
but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in
case it is required
+ storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
+
+ return restoreAllTaskInstances(taskRestoreManagers, taskCheckpoints,
taskBackendFactoryToStoreNames, jobContext,
+ containerModel, samzaContainerMetrics, checkpointManager, config,
taskInstanceMetrics, executor, loggerStoreDir,
+ forceRestoreTasks);
+ }
+
+ /**
+ * Restores all TaskInstances and returns a future for each TaskInstance
restore. Note: In case of
+ * {@link BlobStoreRestoreManager}, this method restore with getDeleted flag
if it receives a
+ * {@link DeletedException}. This will create a new Checkpoint.
+ */
+ private static CompletableFuture<Map<TaskName, Checkpoint>>
restoreAllTaskInstances(
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers,
Map<TaskName, Checkpoint> taskCheckpoints,
+ Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames,
JobContext jobContext,
+ ContainerModel containerModel, SamzaContainerMetrics
samzaContainerMetrics, CheckpointManager checkpointManager,
+ Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
+ File loggedStoreDir, Set<String> forceRestoreTask) {
+
+ Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
+ List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+
+ // Submit restore callable for each taskInstance
+ taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
+ long startTime = System.currentTimeMillis();
+ String taskName = taskInstanceName.getTaskName();
+ LOG.info("Starting restore for state for task: {}", taskName);
+
+ CompletableFuture<Void> restoreFuture;
+ if (forceRestoreTask.contains(taskName) && taskRestoreManager
instanceof BlobStoreRestoreManager) {
+ // If init was retried with getDeleted, force restore with
getDeleted as well, since init only inits the
+ // restoreManager with deleted SnapshotIndex but does not retry to
recover the deleted blobs and delegates it
+ // to restore().
+ // Create an empty future that fails immediately with
DeletedException to force retry in restore.
+ restoreFuture = new CompletableFuture<>();
+ restoreFuture.completeExceptionally(new SamzaException(new
DeletedException()));
+ } else {
+ restoreFuture = taskRestoreManager.restore();
+ }
+
+ CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
+ updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
+
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ LOG.warn(
+ "Received DeletedException during restore for task {}.
Reattempting to get deleted blobs",
Review Comment:
Updated all the occurrence in logs/params etc to getDeleted. Also added it
to more logs for better traceability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]