This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0e8ac2e6d Close TaskRestoreManager only after all restores are
complete (#1682)
0e8ac2e6d is described below
commit 0e8ac2e6d7ae4ee88538c508a162c5e855a36135
Author: shekhars-li <[email protected]>
AuthorDate: Fri Aug 18 11:25:51 2023 -0700
Close TaskRestoreManager only after all restores are complete (#1682)
Close TaskRestoreManager only after all restores are complete
---
.../ContainerStorageManagerRestoreUtil.java | 152 ++++++++++++++-------
.../samza/storage/TestContainerStorageManager.java | 51 +++++--
2 files changed, 145 insertions(+), 58 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
index db0a5be46..4912f9619 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java
@@ -133,8 +133,8 @@ public class ContainerStorageManagerRestoreUtil {
Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor,
File loggedStoreDir, Set<String> forceRestoreTask) {
- Map<TaskName, CompletableFuture<Checkpoint>> newTaskCheckpoints = new
ConcurrentHashMap<>();
- List<Future<Void>> taskRestoreFutures = new ArrayList<>();
+ Map<TaskName, Checkpoint> newTaskCheckpoints = new ConcurrentHashMap<>();
+ List<Future<Void>> restoreAndCleanupFutures = new ArrayList<>();
// Submit restore callable for each taskInstance
taskRestoreManagers.forEach((taskInstanceName, restoreManagersMap) -> {
@@ -156,53 +156,101 @@ public class ContainerStorageManagerRestoreUtil {
restoreFuture = taskRestoreManager.restore();
}
- CompletableFuture<Void> taskRestoreFuture = restoreFuture.handle((res,
ex) -> {
- updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
-
- if (ex != null) {
- if (isUnwrappedExceptionDeletedException(ex)) {
- LOG.warn("Received DeletedException during restore for task {}.
Attempting to get blobs with getDeleted flag",
- taskInstanceName.getTaskName());
-
- // Try to restore with getDeleted flag
- CompletableFuture<Checkpoint> future =
- restoreDeletedSnapshot(taskInstanceName, taskCheckpoints,
- checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
-
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
- jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
- updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
- } else {
- // log and rethrow exception to communicate restore failure
- String msg = String.format("Error restoring state for task: %s",
taskName);
- LOG.error(msg, ex);
- throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
- }
- }
+ // Order of the following async operations is critical. They are
chained as follows:
+ // 1. check if restore succeeded in the first try -> if it failed with
DeletedException, flag restore to be retried.
+ // 2. retry restore with getDeleted if previous step completes and
returns true.
+ // 3. Close taskRestoreManager after 1 (and 2, if there is a retry)
completes
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
- ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be able to
continue processing/backups
- // if restore manager close fails.
- }
- return null;
- });
- taskRestoreFutures.add(taskRestoreFuture);
+ CompletableFuture<Boolean> retryRestoreFuture =
+ restoreFuture.handle(
+ (res, t) -> shouldRetryFailedRestore(t, startTime,
samzaContainerMetrics, taskInstanceName));
+
+ CompletableFuture<Checkpoint> taskCheckpointFuture =
+ retryRestoreFuture.thenCompose(
+ shouldRetryRestore -> retryFailedRestore(shouldRetryRestore,
taskInstanceName, taskCheckpoints,
+ checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor, loggedStoreDir,
+ jobContext, containerModel, factoryName,
taskBackendFactoryToStoreNames));
+
+ CompletableFuture<Void> cleanUpFuture =
+ taskCheckpointFuture.handle(
+ (checkpoint, t) -> cleanUpResources(checkpoint, t, startTime,
samzaContainerMetrics, taskInstanceName,
+ taskRestoreManager, newTaskCheckpoints));
+
+ restoreAndCleanupFutures.add(cleanUpFuture);
});
});
- CompletableFuture<Void> restoreFutures =
CompletableFuture.allOf(taskRestoreFutures.toArray(new CompletableFuture[0]));
- return restoreFutures.thenCompose(ignoredVoid ->
FutureUtil.toFutureOfMap(newTaskCheckpoints));
+
+ return CompletableFuture.allOf(restoreAndCleanupFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(aVoid -> newTaskCheckpoints);
+ }
+
+ /**
+ * Check if the restore should be retried. If previous restore failed with
DeletedException, this method returns true.
+ */
+ private static boolean shouldRetryFailedRestore(Throwable ex, long startTime,
+ SamzaContainerMetrics samzaContainerMetrics, TaskName taskInstanceName) {
+ if (ex != null) {
+ if (isUnwrappedExceptionDeletedException(ex)) {
+ // if the exception is of type DeletedException, retry restore (with
getDeleted flag set to true).
+ return true;
+ } else {
+ updateRestoreTime(startTime, samzaContainerMetrics, taskInstanceName);
+ // log and rethrow exception to communicate restore failure
+ String msg = String.format("Error restoring state for task: %s",
taskInstanceName.getTaskName());
+ LOG.error(msg, ex);
+ throw new SamzaException(msg, ex); // wrap in unchecked exception to
throw from lambda
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Creates a future for either the new task checkpoint (if the restore was
retried), or the old task checkpoint.
+ */
+ private static CompletableFuture<Checkpoint> retryFailedRestore(boolean
shouldRetryRestore, TaskName taskName,
+ Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager
checkpointManager,
+ TaskRestoreManager taskRestoreManager, Config config, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics,
+ ExecutorService executor, File loggedStoreBaseDirectory, JobContext
jobContext, ContainerModel containerModel,
+ String factoryName, Map<TaskName, Map<String, Set<String>>>
taskBackendFactoryToStoreNames) {
+ if (shouldRetryRestore) {
+ // Try to restore with getDeleted flag - return a new task checkpoint
+ return restoreDeletedSnapshot(taskName, taskCheckpoints,
checkpointManager, taskRestoreManager, config,
+ taskInstanceMetrics, executor,
taskBackendFactoryToStoreNames.get(taskName).get(factoryName),
+ loggedStoreBaseDirectory, jobContext, containerModel);
+ }
+ // if shouldRetryRestore is false, do not retry restore. This means
restore completed successfully in the first try
+ // Return the old task checkpoint as no new checkpoint was created
+ return CompletableFuture.completedFuture(taskCheckpoints.get(taskName));
+ }
+
+ /**
+ * cleanup all the resources (like taskRestoreManager), update restore time
and update task checkpoint
+ * returned from the previous future in the chain
+ * NOTE: This method should be called concurrently for all {@link
TaskRestoreManager}s so that stop() can be parallelized.
+ * Certain persistent stores opened in BulkLoad mode are compacted on
stop, so paralleling stop()
+ * also parallelizes their compaction (a time-intensive operation).
+ */
+ private static Void cleanUpResources(Checkpoint checkpoint, Throwable
throwable, long startTime,
+ SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
TaskRestoreManager taskRestoreManager,
+ Map<TaskName, Checkpoint> newTaskCheckpoints) {
+ // exception or not, update the restore time and close TaskRestoreManager
as there will be no more retries.
+ updateRestoreTime(startTime, samzaContainerMetrics, taskName);
+ try {
+ taskRestoreManager.close();
+ } catch (Exception exception) {
+ LOG.error("Error closing restore manager for task: {} after {} restore",
taskName, exception);
+ // ignore exception from close. container may still be able to continue
processing/backups
+ // if restore manager close fails.
+ }
+ if (throwable != null) {
+ String msg = String.format("Error restoring state for task: %s",
taskName.getTaskName());
+ LOG.error(msg);
+ throw new SamzaException(msg, throwable);
+ }
+ if (checkpoint != null) {
+ newTaskCheckpoints.put(taskName, checkpoint);
+ }
+ return null;
}
/**
@@ -219,6 +267,8 @@ public class ContainerStorageManagerRestoreUtil {
ExecutorService executor, Set<String> storesToRestore, File
loggedStoreBaseDirectory, JobContext jobContext,
ContainerModel containerModel) {
+ LOG.warn("Received DeletedException during restore for task {}. Attempting
to get blobs with getDeleted set to true",
+ taskName.getTaskName());
// if taskInstanceMetrics are specified use those for store metrics,
// otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
MetricsRegistry metricsRegistry =
@@ -226,6 +276,7 @@ public class ContainerStorageManagerRestoreUtil {
: new MetricsRegistryMap();
BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
+ blobStoreManager.init();
JobConfig jobConfig = new JobConfig(config);
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(blobStoreManager, executor, new
BlobStoreConfig(config), null,
@@ -269,10 +320,13 @@ public class ContainerStorageManagerRestoreUtil {
return CompletableFuture.allOf(deletePrevSnapshotFutures.toArray(new
CompletableFuture[0]));
});
- // 5. create new checkpoint
CompletableFuture<Checkpoint> newTaskCheckpointsFuture =
- deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms)
->
- writeNewCheckpoint(taskName, checkpointId, scms,
checkpointManager));
+ deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms)
-> {
+ // cleanup resources
+ blobStoreManager.close();
+ // 5. create new checkpoint
+ return writeNewCheckpoint(taskName, checkpointId, scms,
checkpointManager);
+ });
return newTaskCheckpointsFuture.exceptionally(ex -> {
String msg = String.format("Could not restore task: %s after attempting
to restore deleted blobs.", taskName);
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 2b2b51885..fc7d21ee7 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -87,6 +87,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
@@ -583,10 +584,24 @@ public class TestContainerStorageManager {
@Test
public void testRestoreRecoversFromDeletedException() throws Exception {
TaskName taskName = new TaskName("task");
- Set<String> stores = Collections.singleton("store");
+ String storeName = "store";
+ Set<String> stores = Collections.singleton(storeName);
+ String jobName = "job";
+ String jobId = "jobId";
+
+ BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
+ doNothing().when(blobStoreManager).init();
+ doNothing().when(blobStoreManager).close();
BlobStoreRestoreManager taskRestoreManager =
mock(BlobStoreRestoreManager.class);
- doNothing().when(taskRestoreManager).init(any(Checkpoint.class));
+ doAnswer(invocation -> {
+ blobStoreManager.init();
+ return null;
+ }).when(taskRestoreManager).init(any(Checkpoint.class));
+ doAnswer(invocation -> {
+ blobStoreManager.close();
+ return null;
+ }).when(taskRestoreManager).close();
CompletableFuture<Void> failedFuture =
CompletableFuture.completedFuture(null)
.thenCompose(v -> { throw new DeletedException("410 Gone"); });
@@ -615,7 +630,7 @@ public class TestContainerStorageManager {
String expectedOldBlobId = "oldBlobId";
when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
- BlobStoreStateBackendFactory.class.getName(), ImmutableMap.of("store",
expectedOldBlobId)));
+ BlobStoreStateBackendFactory.class.getName(),
ImmutableMap.of(storeName, expectedOldBlobId)));
Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName,
checkpoint);
@@ -625,7 +640,7 @@ public class TestContainerStorageManager {
Config config = new MapConfig(ImmutableMap.of(
"blob.store.manager.factory",
BlobStoreStateBackendFactory.class.getName(),
- "job.name", "test"));
+ "job.name", jobName));
ExecutorService executor = Executors.newFixedThreadPool(5);
@@ -634,23 +649,25 @@ public class TestContainerStorageManager {
// mock ReflectionUtil.getObj
PowerMockito.mockStatic(ReflectionUtil.class);
BlobStoreManagerFactory blobStoreManagerFactory =
mock(BlobStoreManagerFactory.class);
- BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
PowerMockito.when(ReflectionUtil.getObj(anyString(),
eq(BlobStoreManagerFactory.class)))
.thenReturn(blobStoreManagerFactory);
when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class),
any(ExecutorService.class)))
.thenReturn(blobStoreManager);
+ // To verify order of operations
+ InOrder inOrder = inOrder(blobStoreManager, taskRestoreManager);
+
// mock ContainerStorageManagerRestoreUtil.backupRecoveredStore
String expectedBlobId = "blobId";
PowerMockito.spy(ContainerStorageManagerRestoreUtil.class);
-
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of("store",
expectedBlobId)))
+
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of(storeName,
expectedBlobId)))
.when(ContainerStorageManagerRestoreUtil.class, "backupRecoveredStore",
any(JobContext.class), any(ContainerModel.class),
any(Config.class),
any(TaskName.class), any(Set.class), any(Checkpoint.class),
any(File.class),
any(BlobStoreManager.class), any(MetricsRegistry.class),
any(ExecutorService.class));
SnapshotIndex snapshotIndex = new SnapshotIndex(System.currentTimeMillis(),
- new SnapshotMetadata(CheckpointId.create(), "job", "test", "task",
"store"),
+ new SnapshotMetadata(CheckpointId.create(), jobName, jobId,
taskName.getTaskName(), storeName),
new DirIndex("test", new ArrayList<>(), new ArrayList<>(), new
ArrayList<>(), new ArrayList<>()),
Optional.empty());
@@ -672,11 +689,17 @@ public class TestContainerStorageManager {
when(blobStoreManager.delete(deleteBlobIdCaptor.capture(),
any(Metadata.class)))
.thenAnswer(invocation -> CompletableFuture.completedFuture(null));
- Map<TaskName, Checkpoint> updatedTaskCheckpoints =
+ CompletableFuture<Map<TaskName, Checkpoint>> updatedTaskCheckpointsFuture =
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName,
factoryToTaskRestoreManager),
samzaContainerMetrics, checkpointManager, jobContext,
containerModel, taskCheckpoints,
taskBackendFactoryToStoreNames, config, executor, new HashMap<>(),
null,
- ImmutableMap.of("store", systemConsumer)).get();
+ ImmutableMap.of(storeName, systemConsumer));
+
+ // verify close is not called until init and restore futures are complete
+ verify(taskRestoreManager, never()).close();
+ Map<TaskName, Checkpoint> updatedTaskCheckpoints =
updatedTaskCheckpointsFuture.get();
+ // verify close is called only once after restore future is complete
+ verify(taskRestoreManager, times(1)).close();
// verify taskCheckpoint is updated
assertNotEquals(((CheckpointV2)
taskCheckpoints.get(taskName)).getCheckpointId(),
@@ -696,6 +719,16 @@ public class TestContainerStorageManager {
// verify that GET and delete was called on the old SnapshotIndex
assertEquals(expectedOldBlobId, getBlobIdCaptor.getAllValues().get(1));
assertEquals(expectedOldBlobId, deleteBlobIdCaptor.getValue());
+
+ // verify the order of operations in taskRestoreManager and
blobStoreManager
+ // Verifies that close is called after restore(true)
+ inOrder.verify(taskRestoreManager).init(any(Checkpoint.class));
+ inOrder.verify(blobStoreManager).init(); // init called on
blobStoreManager passed to taskRestoreManager
+ inOrder.verify(taskRestoreManager).restore();
+ inOrder.verify(blobStoreManager).init(); // init called on
blobStoreManager created in
ContainerStorageManagerRestoreUtil#restoreDeletedSnapshot
+ inOrder.verify(taskRestoreManager).restore(true);
+ inOrder.verify(blobStoreManager).close(); // close called on
blobStoreManager created in
ContainerStorageManagerRestoreUtil#restoreDeletedSnapshot
+ inOrder.verify(blobStoreManager).close(); // close called on
blobStoreManager passed to taskRestoreManager
}
@Test