saxenapranav commented on code in PR #6716:
URL: https://github.com/apache/hadoop/pull/6716#discussion_r1577262719


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java:
##########
@@ -582,19 +611,111 @@ protected final Path directoryMustExist(
    * Save a task manifest or summary. This will be done by
    * writing to a temp path and then renaming.
    * If the destination path exists: Delete it.
+   * This will retry so that a rename failure from abfs load or IO errors
+   * will not fail the task.
    * @param manifestData the manifest/success file
    * @param tempPath temp path for the initial save
    * @param finalPath final path for rename.
-   * @throws IOException failure to load/parse
+   * @return the manifest saved.
+   * @throws IOException failure to rename after retries.
    */
   @SuppressWarnings("unchecked")
-  protected final <T extends AbstractManifestData> void save(T manifestData,
+  protected final <T extends AbstractManifestData> T save(
+      final T manifestData,
       final Path tempPath,
       final Path finalPath) throws IOException {
-    LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, 
finalPath);
-    trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
-        operations.save(manifestData, tempPath, true));
-    renameFile(tempPath, finalPath);
+    return saveManifest(() -> manifestData, tempPath, finalPath, 
OP_SAVE_TASK_MANIFEST);
+  }
+
+  /**
+   * Generate and save a task manifest or summary file.
+   * This is be done by writing to a temp path and then renaming.
+   * <p>
+   * If the destination path exists: Delete it before the rename.
+   * <p>
+   * This will retry so that a rename failure from abfs load or IO errors
+   * such as delete or save failure will not fail the task.
+   * <p>
+   * The {@code manifestSource} supplier is invoked to get the manifest data
+   * on every attempt.
+   * This permits statistics to be updated, <i>including those of failures</i>.
+   * @param manifestSource supplier the manifest/success file
+   * @param tempPath temp path for the initial save
+   * @param finalPath final path for rename.
+   * @param statistic statistic to use for timing
+   * @return the manifest saved.
+   * @throws IOException failure to save/delete/rename after retries.
+   */
+  @SuppressWarnings("unchecked")
+  protected final <T extends AbstractManifestData> T saveManifest(
+      final Supplier<T> manifestSource,
+      final Path tempPath,
+      final Path finalPath,
+      String statistic) throws IOException {
+
+    AtomicInteger retryCount = new AtomicInteger(0);
+    RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
+        getStageConfig().getManifestSaveAttempts(),
+        SAVE_SLEEP_INTERVAL,
+        TimeUnit.MILLISECONDS);
+
+    // loop until returning a value or raising an exception
+    while (true) {
+      try {
+        T manifestData = requireNonNull(manifestSource.get());
+        trackDurationOfInvocation(getIOStatistics(), statistic, () -> {
+          LOG.info("{}: save manifest to {} then rename as {}'); retry 
count={}",
+              getName(), tempPath, finalPath, retryCount);
+
+          // delete temp path.
+          // even though this is written with overwrite=true, this extra 
recursive
+          // delete also handles a directory being there.
+          deleteRecursive(tempPath, OP_DELETE);
+
+          // save the temp file, overwriting any which remains from an earlier 
attempt
+          operations.save(manifestData, tempPath, true);
+
+          // delete the destination in case it exists either from a failed 
previous
+          // attempt or from a concurrent task commit.
+          delete(finalPath, true, OP_DELETE);
+
+          // rename temp to final
+          renameFile(tempPath, finalPath);

Review Comment:
   Got your point for the directory case. 
   
   For the first point, I now understand that `executeRenamingOperation` would 
call `escalateRenameFailure` on fs.rename() failure which would raise 
PathIOException. I was thinking if instead of calling `renameFile` if we can do 
`operation.renameFile()` directly and raise exception from there. Reason being, 
`escalateRenameFailure` does a getFileStatus on both src and dst for logging. 
We can save 2 filesystem calls if we know the renameFile for the saveManifest 
has failed. Would like to know your view. But, I am good with this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to