prateekm commented on code in PR #1682:
URL: https://github.com/apache/samza/pull/1682#discussion_r1297752899
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
+ future.whenComplete((r, e) -> {
Review Comment:
This looks like an async (fire-and-forget) operation? If you do that it's
possible that CSM thinks the restore managers (and stores) are closed, and
recreates/reopens them, while previous close is still in progress.
Chain this to the `restoreDeletedSnapshot` future using 'handle'/
'thenCompose' etc., and add the returned future to the map.
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
+ future.whenComplete((r, e) -> {
updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
+ closeTaskRestoreManager(taskRestoreManager, taskName);
+ });
+ newTaskCheckpoints.put(taskInstanceName, future);
} else {
// log and rethrow exception to communicate restore failure
String msg = String.format("Error restoring state for task: %s",
taskName);
LOG.error(msg, ex);
throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
}
- }
-
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
- ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be able to
continue processing/backups
- // if restore manager close fails.
+ } else {
+ // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
+ // on stop, so paralleling stop() also parallelizes their
compaction (a time-intensive operation).
+ // NOTE: closing the taskRestoreManager outside this else block
may cause taskRestoreManager to be closed
+ // before async restoreDeletedSnapshot() is complete.
+ closeTaskRestoreManager(taskRestoreManager, taskName);
Review Comment:
Move the ("on stop, ...") comment to the method javadocs, since it applies
to both invocations. There is no explicit parallelization happening within the
method, so in the method javadoc rephrase it as "This should be called
concurrently for all tasks ...".
##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java:
##########
@@ -170,31 +170,23 @@ private static CompletableFuture<Map<TaskName,
Checkpoint>> restoreAllTaskInstan
checkpointManager, taskRestoreManager, config,
taskInstanceMetrics, executor,
taskBackendFactoryToStoreNames.get(taskInstanceName).get(factoryName),
loggedStoreDir,
jobContext, containerModel);
- try {
- newTaskCheckpoints.put(taskInstanceName, future);
- } catch (Exception e) {
- String msg = String.format("DeletedException during restore
task: %s after retrying to get deleted blobs.", taskName);
- throw new SamzaException(msg, e);
- } finally {
+ future.whenComplete((r, e) -> {
updateRestoreTime(startTime, samzaContainerMetrics,
taskInstanceName);
- }
+ closeTaskRestoreManager(taskRestoreManager, taskName);
+ });
+ newTaskCheckpoints.put(taskInstanceName, future);
} else {
// log and rethrow exception to communicate restore failure
String msg = String.format("Error restoring state for task: %s",
taskName);
LOG.error(msg, ex);
throw new SamzaException(msg, ex); // wrap in unchecked
exception to throw from lambda
}
- }
-
- // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
- // on stop, so paralleling stop() also parallelizes their compaction
(a time-intensive operation).
- try {
- taskRestoreManager.close();
- } catch (Exception e) {
- LOG.error("Error closing restore manager for task: {} after {}
restore", taskName,
- ex != null ? "unsuccessful" : "successful", e);
- // ignore exception from close. container may still be able to
continue processing/backups
- // if restore manager close fails.
+ } else {
+ // Stop all persistent stores after restoring. Certain persistent
stores opened in BulkLoad mode are compacted
+ // on stop, so paralleling stop() also parallelizes their
compaction (a time-intensive operation).
+ // NOTE: closing the taskRestoreManager outside this else block
may cause taskRestoreManager to be closed
+ // before async restoreDeletedSnapshot() is complete.
+ closeTaskRestoreManager(taskRestoreManager, taskName);
Review Comment:
Rephrase the second half too. Explain what this is trying to ensure so
refactors can make sure the invariant is maintained.
E.g., can just say: "taskRestoreManager must only be closed after the
initial restore (the case here) or retried restore future is complete." or
something like that. Then copy the same comment above and move the "the case
here" section to after the "retried restore future".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]