prateekm commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1297752899


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
                       checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
                       
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
                       jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
+              future.whenComplete((r, e) -> {

Review Comment:
   This looks like an async (fire-and-forget) operation? If you do that it's 
possible that CSM thinks the restore managers (and stores) are closed, and 
recreates/reopens them, while previous close is still in progress.
   
   Chain this to the `restoreDeletedSnapshot` future using 'handle'/ 
'thenCompose' etc., and add the returned future to the map.
   
   



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
                       checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
                       
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
                       jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
+              future.whenComplete((r, e) -> {
                 updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
+                closeTaskRestoreManager(taskRestoreManager, taskName);
+              });
+              newTaskCheckpoints.put(taskInstanceName, future);
             } else {
               // log and rethrow exception to communicate restore failure
               String msg = String.format("Error restoring state for task: %s", 
taskName);
               LOG.error(msg, ex);
               throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
             }
-          }
-
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
-                ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be able to 
continue processing/backups
-            // if restore manager close fails.
+          } else {
+            // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
+            // on stop, so paralleling stop() also parallelizes their 
compaction (a time-intensive operation).
+            // NOTE: closing the taskRestoreManager outside this else block 
may cause taskRestoreManager to be closed
+            // before async restoreDeletedSnapshot() is complete.
+            closeTaskRestoreManager(taskRestoreManager, taskName);

Review Comment:
   Move the ("on stop, ...") comment to the method javadocs, since it applies 
to both invocations. There is no explicit parallelization happening within the 
method, so in the method javadoc rephrase it as "This should be called 
concurrently for all tasks ...".



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName, 
Checkpoint>> restoreAllTaskInstan
                       checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
                       
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
                       jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
+              future.whenComplete((r, e) -> {
                 updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
+                closeTaskRestoreManager(taskRestoreManager, taskName);
+              });
+              newTaskCheckpoints.put(taskInstanceName, future);
             } else {
               // log and rethrow exception to communicate restore failure
               String msg = String.format("Error restoring state for task: %s", 
taskName);
               LOG.error(msg, ex);
               throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
             }
-          }
-
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
-                ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be able to 
continue processing/backups
-            // if restore manager close fails.
+          } else {
+            // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
+            // on stop, so paralleling stop() also parallelizes their 
compaction (a time-intensive operation).
+            // NOTE: closing the taskRestoreManager outside this else block 
may cause taskRestoreManager to be closed
+            // before async restoreDeletedSnapshot() is complete.
+            closeTaskRestoreManager(taskRestoreManager, taskName);

Review Comment:
   Rephrase the second half too. Explain what this is trying to ensure so 
refactors can make sure the invariant is maintained.
   
   E.g., can just say: "taskRestoreManager must only be closed after the 
initial restore (the case here) or retried restore future is complete."  or 
something like that. Then copy the same comment above and move the "the case 
here" section to after the "retried restore future".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to