shekhars-li commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1297851046
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
+ future.whenComplete((r, e) -> {
updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
+ closeTaskRestoreManager(taskRestoreManager, taskName);
+ });
+ newTaskCheckpoints.put(taskInstanceName, future);
} else {
// log and rethrow exception to communicate restore failure
String msg = String.format("Error restoring state for task: %s",
taskName);
LOG.error(msg, ex);
throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
}
- }
-
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
- ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be able to
continue processing/backups
- // if restore manager close fails.
+ } else {
+ // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
+ // on stop, so paralleling stop() also parallelizes their
compaction (a time-intensive operation).
+ // NOTE: closing the taskRestoreManager outside this else block
may cause taskRestoreManager to be closed
+ // before async restoreDeletedSnapshot() is complete.
+ closeTaskRestoreManager(taskRestoreManager, taskName);
Review Comment:
Updated that. Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]