[ https://issues.apache.org/jira/browse/MAPREDUCE-7474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840286#comment-17840286 ]
ASF GitHub Bot commented on MAPREDUCE-7474: ------------------------------------------- saxenapranav commented on code in PR #6716: URL: https://github.com/apache/hadoop/pull/6716#discussion_r1577262719 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java: ########## @@ -582,19 +611,111 @@ protected final Path directoryMustExist( * Save a task manifest or summary. This will be done by * writing to a temp path and then renaming. * If the destination path exists: Delete it. + * This will retry so that a rename failure from abfs load or IO errors + * will not fail the task. * @param manifestData the manifest/success file * @param tempPath temp path for the initial save * @param finalPath final path for rename. - * @throws IOException failure to load/parse + * @return the manifest saved. + * @throws IOException failure to rename after retries. */ @SuppressWarnings("unchecked") - protected final <T extends AbstractManifestData> void save(T manifestData, + protected final <T extends AbstractManifestData> T save( + final T manifestData, final Path tempPath, final Path finalPath) throws IOException { - LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath); - trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () -> - operations.save(manifestData, tempPath, true)); - renameFile(tempPath, finalPath); + return saveManifest(() -> manifestData, tempPath, finalPath, OP_SAVE_TASK_MANIFEST); + } + + /** + * Generate and save a task manifest or summary file. + * This is be done by writing to a temp path and then renaming. + * <p> + * If the destination path exists: Delete it before the rename. + * <p> + * This will retry so that a rename failure from abfs load or IO errors + * such as delete or save failure will not fail the task. + * <p> + * The {@code manifestSource} supplier is invoked to get the manifest data + * on every attempt. + * This permits statistics to be updated, <i>including those of failures</i>. + * @param manifestSource supplier the manifest/success file + * @param tempPath temp path for the initial save + * @param finalPath final path for rename. + * @param statistic statistic to use for timing + * @return the manifest saved. + * @throws IOException failure to save/delete/rename after retries. + */ + @SuppressWarnings("unchecked") + protected final <T extends AbstractManifestData> T saveManifest( + final Supplier<T> manifestSource, + final Path tempPath, + final Path finalPath, + String statistic) throws IOException { + + AtomicInteger retryCount = new AtomicInteger(0); + RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep( + getStageConfig().getManifestSaveAttempts(), + SAVE_SLEEP_INTERVAL, + TimeUnit.MILLISECONDS); + + // loop until returning a value or raising an exception + while (true) { + try { + T manifestData = requireNonNull(manifestSource.get()); + trackDurationOfInvocation(getIOStatistics(), statistic, () -> { + LOG.info("{}: save manifest to {} then rename as {}'); retry count={}", + getName(), tempPath, finalPath, retryCount); + + // delete temp path. + // even though this is written with overwrite=true, this extra recursive + // delete also handles a directory being there. + deleteRecursive(tempPath, OP_DELETE); + + // save the temp file, overwriting any which remains from an earlier attempt + operations.save(manifestData, tempPath, true); + + // delete the destination in case it exists either from a failed previous + // attempt or from a concurrent task commit. + delete(finalPath, true, OP_DELETE); + + // rename temp to final + renameFile(tempPath, finalPath); Review Comment: Got your point for the directory case. For the first point, I now understand that `executeRenamingOperation` would call `escalateRenameFailure` on fs.rename() failure which would raise PathIOException. I was thinking if instead of calling `renameFile` if we can do `operation.renameFile()` directly and raise exception from there. Reason being, `escalateRenameFailure` does a getFileStatus on both src and dst for logging. We can save 2 filesystem calls if we know the renameFile for the saveManifest has failed. Would like to know your view. But, I am good with this comment. > [ABFS] Improve commit resilience and performance in Manifest Committer > ---------------------------------------------------------------------- > > Key: MAPREDUCE-7474 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7474 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: client > Affects Versions: 3.4.0, 3.3.6 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > * Manifest committer is not resilient to rename failures on task commit > without HADOOP-18012 rename recovery enabled. > * large burst of delete calls noted: are they needed? > relates to HADOOP-19093 but takes a more minimal approach with goal of > changes in manifest committer only. > Initial proposed changes > * retry recovery on task commit rename, always (repeat save, delete, rename) > * audit delete use and see if it can be pruned -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org