This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e8ac2e6d Close TaskRestoreManager only after all restores are 
complete  (#1682)
0e8ac2e6d is described below

commit 0e8ac2e6d7ae4ee88538c508a162c5e855a36135
Author: shekhars-li <[email protected]>
AuthorDate: Fri Aug 18 11:25:51 2023 -0700

    Close TaskRestoreManager only after all restores are complete  (#1682)
    
    Close TaskRestoreManager only after all restores are complete
---
 .../ContainerStorageManagerRestoreUtil.java        | 152 ++++++++++++++-------
 .../samza/storage/TestContainerStorageManager.java |  51 +++++--
 2 files changed, 145 insertions(+), 58 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
index db0a5be46..4912f9619 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
@@ -133,8 +133,8 @@ public class ContainerStorageManagerRestoreUtil {
       Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
ExecutorService executor,
       File loggedStoreDir, Set<String> forceRestoreTask) {
 
-    Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new 
ConcurrentHashMap<>();
-    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+    Map<TaskName, Checkpoint> newTaskCheckpoints = new ConcurrentHashMap<>();
+    List<Future<Void>> restoreAndCleanupFutures = new ArrayList<>();
 
     // Submit restore callable for each taskInstance
     taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
@@ -156,53 +156,101 @@ public class ContainerStorageManagerRestoreUtil {
           restoreFuture = taskRestoreManager.restore();
         }
 
-        CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res, 
ex) -> {
-          updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-
-          if (ex != null) {
-            if (isUnwrappedExceptionDeletedException(ex)) {
-              LOG.warn("Received DeletedException during restore for task {}. 
Attempting to get blobs with getDeleted flag",
-                  taskInstanceName.getTaskName());
-
-              // Try to restore with getDeleted flag
-              CompletableFuture<Checkpoint> future =
-                  restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
-                      checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor,
-                      
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName), 
loggedStoreDir,
-                      jobContext, containerModel);
-              try {
-                newTaskCheckpoints.put(taskInstanceName, future);
-              } catch (Exception e) {
-                String msg = String.format("DeletedException during restore 
task: %s after retrying to get deleted blobs.", taskName);
-                throw new SamzaException(msg, e);
-              } finally {
-                updateRestoreTime(startTime, samzaContainerMetrics, 
taskInstanceName);
-              }
-            } else {
-              // log and rethrow exception to communicate restore failure
-              String msg = String.format("Error restoring state for task: %s", 
taskName);
-              LOG.error(msg, ex);
-              throw new SamzaException(msg, ex); // wrap in unchecked 
exception to throw from lambda
-            }
-          }
+        // Order of the following async operations is critical. They are 
chained as follows:
+        // 1. check if restore succeeded in the first try -> if it failed with 
DeletedException, flag restore to be retried.
+        // 2. retry restore with getDeleted if previous step completes and 
returns true.
+        // 3. Close taskRestoreManager after 1 (and 2, if there is a retry) 
completes
 
-          // Stop all persistent stores after restoring. Certain persistent 
stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction 
(a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} 
restore", taskName,
-                ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be able to 
continue processing/backups
-            // if restore manager close fails.
-          }
-          return null;
-        });
-        taskRestoreFutures.add(taskRestoreFuture);
+        CompletableFuture<Boolean> retryRestoreFuture =
+            restoreFuture.handle(
+              (res, t) -> shouldRetryFailedRestore(t, startTime, 
samzaContainerMetrics, taskInstanceName));
+
+        CompletableFuture<Checkpoint> taskCheckpointFuture =
+            retryRestoreFuture.thenCompose(
+              shouldRetryRestore -> retryFailedRestore(shouldRetryRestore, 
taskInstanceName, taskCheckpoints,
+                  checkpointManager, taskRestoreManager, config, 
taskInstanceMetrics, executor, loggedStoreDir,
+                  jobContext, containerModel, factoryName, 
taskBackendFactoryToStoreNames));
+
+        CompletableFuture<Void> cleanUpFuture =
+            taskCheckpointFuture.handle(
+              (checkpoint, t) -> cleanUpResources(checkpoint, t, startTime, 
samzaContainerMetrics, taskInstanceName,
+                  taskRestoreManager, newTaskCheckpoints));
+
+        restoreAndCleanupFutures.add(cleanUpFuture);
       });
     });
-    CompletableFuture<Void> restoreFutures = 
CompletableFuture.allOf(taskRestoreFutures.toArray(new CompletableFuture[0]));
-    return restoreFutures.thenCompose(ignoredVoid -> 
FutureUtil.toFutureOfMap(newTaskCheckpoints));
+
+    return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new 
CompletableFuture[0]))
+        .thenApply(aVoid -> newTaskCheckpoints);
+  }
+
+  /**
+   * Check if the restore should be retried. If previous restore failed with 
DeletedException, this method returns true.
+   */
+  private static boolean shouldRetryFailedRestore(Throwable ex, long startTime,
+      SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+    if (ex != null) {
+      if (isUnwrappedExceptionDeletedException(ex)) {
+        // if the exception is of type DeletedException, retry restore (with 
getDeleted flag set to true).
+        return true;
+      } else {
+        updateRestoreTime(startTime, samzaContainerMetrics, taskInstanceName);
+        // log and rethrow exception to communicate restore failure
+        String msg = String.format("Error restoring state for task: %s", 
taskInstanceName.getTaskName());
+        LOG.error(msg, ex);
+        throw new SamzaException(msg, ex); // wrap in unchecked exception to 
throw from lambda
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Creates a future for either the new task checkpoint (if the restore was 
retried), or the old task checkpoint.
+   */
+  private static CompletableFuture<Checkpoint> retryFailedRestore(boolean 
shouldRetryRestore, TaskName taskName,
+      Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager 
checkpointManager,
+      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics,
+      ExecutorService executor, File loggedStoreBaseDirectory, JobContext 
jobContext, ContainerModel containerModel,
+      String factoryName, Map<TaskName, Map<String, Set<String>>> 
taskBackendFactoryToStoreNames) {
+    if (shouldRetryRestore) {
+      // Try to restore with getDeleted flag - return a new task checkpoint
+      return restoreDeletedSnapshot(taskName, taskCheckpoints, 
checkpointManager, taskRestoreManager, config,
+          taskInstanceMetrics, executor, 
taskBackendFactoryToStoreNames.get(taskName).get(factoryName),
+          loggedStoreBaseDirectory, jobContext, containerModel);
+    }
+    // if shouldRetryRestore is false, do not retry restore. This means 
restore completed successfully in the first try
+    // Return the old task checkpoint as no new checkpoint was created
+    return CompletableFuture.completedFuture(taskCheckpoints.get(taskName));
+  }
+
+  /**
+   * cleanup all the resources (like taskRestoreManager), update restore time 
and update task checkpoint
+   * returned from the previous future in the chain
+   * NOTE: This method should be called concurrently for all {@link 
TaskRestoreManager}s so that stop() can be parallelized.
+   *       Certain persistent stores opened in BulkLoad mode are compacted on 
stop, so paralleling stop()
+   *       also parallelizes their compaction (a time-intensive operation).
+   */
+  private static Void cleanUpResources(Checkpoint checkpoint, Throwable 
throwable, long startTime,
+      SamzaContainerMetrics samzaContainerMetrics, TaskName taskName, 
TaskRestoreManager taskRestoreManager,
+      Map<TaskName, Checkpoint> newTaskCheckpoints) {
+    // exception or not, update the restore time and close TaskRestoreManager 
as there will be no more retries.
+    updateRestoreTime(startTime, samzaContainerMetrics, taskName);
+    try {
+      taskRestoreManager.close();
+    } catch (Exception exception) {
+      LOG.error("Error closing restore manager for task: {} after {} restore", 
taskName, exception);
+      // ignore exception from close. container may still be able to continue 
processing/backups
+      // if restore manager close fails.
+    }
+    if (throwable != null) {
+      String msg = String.format("Error restoring state for task: %s", 
taskName.getTaskName());
+      LOG.error(msg);
+      throw new SamzaException(msg, throwable);
+    }
+    if (checkpoint != null) {
+      newTaskCheckpoints.put(taskName, checkpoint);
+    }
+    return null;
   }
 
   /**
@@ -219,6 +267,8 @@ public class ContainerStorageManagerRestoreUtil {
       ExecutorService executor, Set<String> storesToRestore, File 
loggedStoreBaseDirectory, JobContext jobContext,
       ContainerModel containerModel) {
 
+    LOG.warn("Received DeletedException during restore for task {}. Attempting 
to get blobs with getDeleted set to true",
+        taskName.getTaskName());
     // if taskInstanceMetrics are specified use those for store metrics,
     // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
     MetricsRegistry metricsRegistry =
@@ -226,6 +276,7 @@ public class ContainerStorageManagerRestoreUtil {
             : new MetricsRegistryMap();
 
     BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
+    blobStoreManager.init();
     JobConfig jobConfig = new JobConfig(config);
     BlobStoreUtil blobStoreUtil =
         new BlobStoreUtil(blobStoreManager, executor, new 
BlobStoreConfig(config), null,
@@ -269,10 +320,13 @@ public class ContainerStorageManagerRestoreUtil {
         return CompletableFuture.allOf(deletePrevSnapshotFutures.toArray(new 
CompletableFuture[0]));
       });
 
-    // 5. create new checkpoint
     CompletableFuture<Checkpoint> newTaskCheckpointsFuture =
-        deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms) 
->
-            writeNewCheckpoint(taskName, checkpointId, scms, 
checkpointManager));
+        deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms) 
-> {
+          // cleanup resources
+          blobStoreManager.close();
+          // 5. create new checkpoint
+          return writeNewCheckpoint(taskName, checkpointId, scms, 
checkpointManager);
+        });
 
     return newTaskCheckpointsFuture.exceptionally(ex -> {
       String msg = String.format("Could not restore task: %s after attempting 
to restore deleted blobs.", taskName);
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 2b2b51885..fc7d21ee7 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -87,6 +87,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
@@ -583,10 +584,24 @@ public class TestContainerStorageManager {
   @Test
   public void testRestoreRecoversFromDeletedException() throws Exception {
     TaskName taskName = new TaskName("task");
-    Set<String> stores = Collections.singleton("store");
+    String storeName = "store";
+    Set<String> stores = Collections.singleton(storeName);
+    String jobName = "job";
+    String jobId = "jobId";
+
+    BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+    doNothing().when(blobStoreManager).init();
+    doNothing().when(blobStoreManager).close();
 
     BlobStoreRestoreManager taskRestoreManager = 
mock(BlobStoreRestoreManager.class);
-    doNothing().when(taskRestoreManager).init(any(Checkpoint.class));
+    doAnswer(invocation -> {
+      blobStoreManager.init();
+      return null;
+    }).when(taskRestoreManager).init(any(Checkpoint.class));
+    doAnswer(invocation -> {
+      blobStoreManager.close();
+      return null;
+    }).when(taskRestoreManager).close();
 
     CompletableFuture<Void> failedFuture = 
CompletableFuture.completedFuture(null)
         .thenCompose(v -> { throw new DeletedException("410 Gone"); });
@@ -615,7 +630,7 @@ public class TestContainerStorageManager {
 
     String expectedOldBlobId = "oldBlobId";
     when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
-        BlobStoreStateBackendFactory.class.getName(), ImmutableMap.of("store", 
expectedOldBlobId)));
+        BlobStoreStateBackendFactory.class.getName(), 
ImmutableMap.of(storeName, expectedOldBlobId)));
 
     Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName, 
checkpoint);
 
@@ -625,7 +640,7 @@ public class TestContainerStorageManager {
 
     Config config = new MapConfig(ImmutableMap.of(
         "blob.store.manager.factory", 
BlobStoreStateBackendFactory.class.getName(),
-        "job.name", "test"));
+        "job.name", jobName));
 
     ExecutorService executor = Executors.newFixedThreadPool(5);
 
@@ -634,23 +649,25 @@ public class TestContainerStorageManager {
     // mock ReflectionUtil.getObj
     PowerMockito.mockStatic(ReflectionUtil.class);
     BlobStoreManagerFactory blobStoreManagerFactory = 
mock(BlobStoreManagerFactory.class);
-    BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
     PowerMockito.when(ReflectionUtil.getObj(anyString(), 
eq(BlobStoreManagerFactory.class)))
         .thenReturn(blobStoreManagerFactory);
     when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class), 
any(ExecutorService.class)))
         .thenReturn(blobStoreManager);
 
+    // To verify order of operations
+    InOrder inOrder = inOrder(blobStoreManager, taskRestoreManager);
+
     // mock ContainerStorageManagerRestoreUtil.backupRecoveredStore
     String expectedBlobId = "blobId";
     PowerMockito.spy(ContainerStorageManagerRestoreUtil.class);
-    
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of("store",
 expectedBlobId)))
+    
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of(storeName,
 expectedBlobId)))
         .when(ContainerStorageManagerRestoreUtil.class, "backupRecoveredStore",
             any(JobContext.class), any(ContainerModel.class), 
any(Config.class),
             any(TaskName.class), any(Set.class), any(Checkpoint.class), 
any(File.class),
             any(BlobStoreManager.class), any(MetricsRegistry.class), 
any(ExecutorService.class));
 
     SnapshotIndex snapshotIndex = new SnapshotIndex(System.currentTimeMillis(),
-        new SnapshotMetadata(CheckpointId.create(), "job", "test", "task", 
"store"),
+        new SnapshotMetadata(CheckpointId.create(), jobName, jobId, 
taskName.getTaskName(), storeName),
         new DirIndex("test", new ArrayList<>(), new ArrayList<>(), new 
ArrayList<>(), new ArrayList<>()),
         Optional.empty());
 
@@ -672,11 +689,17 @@ public class TestContainerStorageManager {
     when(blobStoreManager.delete(deleteBlobIdCaptor.capture(), 
any(Metadata.class)))
         .thenAnswer(invocation -> CompletableFuture.completedFuture(null));
 
-    Map<TaskName, Checkpoint> updatedTaskCheckpoints =
+    CompletableFuture<Map<TaskName, Checkpoint>> updatedTaskCheckpointsFuture =
         
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName,
 factoryToTaskRestoreManager),
             samzaContainerMetrics, checkpointManager, jobContext, 
containerModel, taskCheckpoints,
             taskBackendFactoryToStoreNames, config, executor, new HashMap<>(), 
null,
-            ImmutableMap.of("store", systemConsumer)).get();
+            ImmutableMap.of(storeName, systemConsumer));
+
+    // verify close is not called until init and restore futures are complete
+    verify(taskRestoreManager, never()).close();
+    Map<TaskName, Checkpoint> updatedTaskCheckpoints = 
updatedTaskCheckpointsFuture.get();
+    // verify close is called only once after restore future is complete
+    verify(taskRestoreManager, times(1)).close();
 
     // verify taskCheckpoint is updated
     assertNotEquals(((CheckpointV2) 
taskCheckpoints.get(taskName)).getCheckpointId(),
@@ -696,6 +719,16 @@ public class TestContainerStorageManager {
     // verify that GET and delete was called on the old SnapshotIndex
     assertEquals(expectedOldBlobId, getBlobIdCaptor.getAllValues().get(1));
     assertEquals(expectedOldBlobId, deleteBlobIdCaptor.getValue());
+
+    // verify the order of operations in taskRestoreManager and 
blobStoreManager
+    // Verifies that close is called after restore(true)
+    inOrder.verify(taskRestoreManager).init(any(Checkpoint.class));
+    inOrder.verify(blobStoreManager).init(); // init called on 
blobStoreManager passed to taskRestoreManager
+    inOrder.verify(taskRestoreManager).restore();
+    inOrder.verify(blobStoreManager).init(); // init called on 
blobStoreManager created in 
ContainerStorageManagerRestoreUtil#restoreDeletedSnapshot
+    inOrder.verify(taskRestoreManager).restore(true);
+    inOrder.verify(blobStoreManager).close(); // close called on 
blobStoreManager created in 
ContainerStorageManagerRestoreUtil#restoreDeletedSnapshot
+    inOrder.verify(blobStoreManager).close(); // close called on 
blobStoreManager passed to taskRestoreManager
   }
 
   @Test

Reply via email to