steveloughran commented on code in PR #6716:
URL: https://github.com/apache/hadoop/pull/6716#discussion_r1576221197


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java:
##########
@@ -582,19 +611,111 @@ protected final Path directoryMustExist(
    * Save a task manifest or summary. This will be done by
    * writing to a temp path and then renaming.
    * If the destination path exists: Delete it.
+   * This will retry so that a rename failure from abfs load or IO errors
+   * will not fail the task.
    * @param manifestData the manifest/success file
    * @param tempPath temp path for the initial save
    * @param finalPath final path for rename.
-   * @throws IOException failure to load/parse
+   * @return the manifest saved.
+   * @throws IOException failure to rename after retries.
    */
   @SuppressWarnings("unchecked")
-  protected final <T extends AbstractManifestData> void save(T manifestData,
+  protected final <T extends AbstractManifestData> T save(
+      final T manifestData,
       final Path tempPath,
       final Path finalPath) throws IOException {
-    LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, 
finalPath);
-    trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
-        operations.save(manifestData, tempPath, true));
-    renameFile(tempPath, finalPath);
+    return saveManifest(() -> manifestData, tempPath, finalPath, 
OP_SAVE_TASK_MANIFEST);
+  }
+
+  /**
+   * Generate and save a task manifest or summary file.
+   * This is be done by writing to a temp path and then renaming.
+   * <p>
+   * If the destination path exists: Delete it before the rename.
+   * <p>
+   * This will retry so that a rename failure from abfs load or IO errors
+   * such as delete or save failure will not fail the task.
+   * <p>
+   * The {@code manifestSource} supplier is invoked to get the manifest data
+   * on every attempt.
+   * This permits statistics to be updated, <i>including those of failures</i>.
+   * @param manifestSource supplier the manifest/success file
+   * @param tempPath temp path for the initial save
+   * @param finalPath final path for rename.
+   * @param statistic statistic to use for timing
+   * @return the manifest saved.
+   * @throws IOException failure to save/delete/rename after retries.
+   */
+  @SuppressWarnings("unchecked")
+  protected final <T extends AbstractManifestData> T saveManifest(
+      final Supplier<T> manifestSource,
+      final Path tempPath,
+      final Path finalPath,
+      String statistic) throws IOException {
+
+    AtomicInteger retryCount = new AtomicInteger(0);
+    RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
+        getStageConfig().getManifestSaveAttempts(),
+        SAVE_SLEEP_INTERVAL,
+        TimeUnit.MILLISECONDS);
+
+    // loop until returning a value or raising an exception
+    while (true) {
+      try {
+        T manifestData = requireNonNull(manifestSource.get());
+        trackDurationOfInvocation(getIOStatistics(), statistic, () -> {
+          LOG.info("{}: save manifest to {} then rename as {}'); retry 
count={}",
+              getName(), tempPath, finalPath, retryCount);
+
+          // delete temp path.
+          // even though this is written with overwrite=true, this extra 
recursive
+          // delete also handles a directory being there.
+          deleteRecursive(tempPath, OP_DELETE);
+
+          // save the temp file, overwriting any which remains from an earlier 
attempt
+          operations.save(manifestData, tempPath, true);
+
+          // delete the destination in case it exists either from a failed 
previous
+          // attempt or from a concurrent task commit.
+          delete(finalPath, true, OP_DELETE);
+
+          // rename temp to final
+          renameFile(tempPath, finalPath);

Review Comment:
   1. renameFile javadocs `throws PathIOException – if the rename() call 
returned false.`. so no need to check the result here
   2. directory deletion, maybe: but what is going to create a directory here? 
nothing in the committer will, and if some other process is doing stuff in the 
job attempt dir you are doomed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to