This is an automated email from the ASF dual-hosted git repository. cnauroth pushed a commit to branch branch-3.4.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4.2 by this push: new 5986095d03e HADOOP-18568. magic committer optional cleanup 5986095d03e is described below commit 5986095d03ef53674a1a95b0149ae9ee6a675665 Author: Hossein Torabi <blck...@pm.me> AuthorDate: Fri Jun 20 18:36:16 2025 +0000 HADOOP-18568. magic committer optional cleanup Closes #7693 Co-authored-by: Chris Nauroth <cnaur...@apache.org> Signed-off-by: Chris Nauroth <cnaur...@apache.org> (cherry picked from commit 7acb89cb1576963c7b3b5ee062ce7a2f80ae2ad6) (cherry picked from commit 43322a9e3b882774593c0baad034a87afdf0f604) --- .../hadoop/fs/s3a/commit/CommitConstants.java | 12 ++++++++++ .../s3a/commit/magic/MagicCommitTrackerUtils.java | 12 ++++++++++ .../fs/s3a/commit/magic/MagicS3GuardCommitter.java | 23 ++++++++++-------- .../site/markdown/tools/hadoop-aws/committers.md | 18 ++++++++++++++ .../s3a/commit/magic/ITestMagicCommitProtocol.java | 28 ++++++++++++++++++++++ 5 files changed, 83 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 4f000550993..2e4cf6fc4a3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -258,6 +258,18 @@ private CommitConstants() { public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT = false; + /** + * Should Magic committer cleanup all the staging dirs. + */ + public static final String FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED = + "fs.s3a.committer.magic.cleanup.enabled"; + + /** + * Default value for {@link #FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED}: {@value}. + */ + public static final boolean FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT = + true; + /** * Path in the cluster filesystem for temporary data: {@value}. * This is for HDFS, not the local filesystem. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java index 2ceac1c8e03..1db36dc5067 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java @@ -61,4 +61,16 @@ public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) { CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT); } + + /** + * Is cleanup of magic committer staging dirs enabled. + * @param conf Configuration + * @return true if cleanup of staging dir is enabled. + */ + public static boolean isCleanupMagicCommitterEnabled( + Configuration conf) { + return conf.getBoolean( + CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED, + CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 5ed1a3abd46..ce8436d3921 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isCleanupMagicCommitterEnabled; import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; @@ -131,16 +132,18 @@ protected ActiveCommit listPendingUploadsToCommit( * Delete the magic directory. */ public void cleanupStagingDirs() { - final Path out = getOutputPath(); - Path path = getMagicJobPath(getUUID(), out); - try(DurationInfo ignored = new DurationInfo(LOG, true, - "Deleting magic directory %s", path)) { - Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), - () -> deleteWithWarning(getDestFS(), path, true)); - // and the job temp directory with manifests - Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(), - () -> deleteWithWarning(getDestFS(), - new Path(out, TEMP_DATA), true)); + if (isCleanupMagicCommitterEnabled(getConf())) { + final Path out = getOutputPath(); + Path path = getMagicJobPath(getUUID(), out); + try(DurationInfo ignored = new DurationInfo(LOG, true, + "Deleting magic directory %s", path)) { + Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), + () -> deleteWithWarning(getDestFS(), path, true)); + // and the job temp directory with manifests + Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(), + () -> deleteWithWarning(getDestFS(), + new Path(out, TEMP_DATA), true)); + } } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index 2dff5b79914..7d0e6c2bfaa 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -558,6 +558,7 @@ The table below provides a summary of each option. | `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files.| -4 | | `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` | | `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` | +| `fs.s3a.committer.magic.cleanup.enabled` | Cleanup the magic path after the job is committed. | `true` | The examples below shows how these options can be configured in XML. @@ -1058,3 +1059,20 @@ one of the following conditions are met 1. The committer is being used in spark, and the version of spark being used does not set the `spark.sql.sources.writeJobUUID` property. Either upgrade to a new spark release, or set `fs.s3a.committer.generate.uuid` to true. + +### Long Job Completion Time Due to Magic Committer Cleanup +When using the S3A Magic Committer in large Spark or MapReduce jobs, job completion can be significantly delayed +due to the cleanup of temporary files (such as those under the `__magic` directory). +This happens because deleting many small files in S3 is a slow and expensive operation, especially at scale. +In some cases, the cleanup phase alone can take several minutes or more — even after all data has already been written. + +To reduce this overhead, Hadoop 3.4.2+ introduced a configuration option in +[HADOOP-18568](https://issues.apache.org/jira/browse/HADOOP-18568) that allows users to disable this automatic cleanup +and use lifecycle policies instead to clean up the temporary files. +#### Configuration +```xml +<property> + <name>fs.s3a.committer.magic.cleanup.enabled</name> + <value>false</value> +</property> +``` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index cbfc23a2a29..f848c67a13b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -213,6 +213,34 @@ public void testCommittersPathsHaveUUID() throws Throwable { .contains(ta0); } + /** + * Verify that the magic committer cleanup + */ + @Test + public void testCommitterCleanup() throws Throwable { + describe("Committer cleanup enabled. hence it should delete the task attempt path after commit"); + JobData jobData = startJob(true); + JobContext jContext = jobData.getJContext(); + TaskAttemptContext tContext = jobData.getTContext(); + AbstractS3ACommitter committer = jobData.getCommitter(); + + commit(committer, jContext, tContext); + assertJobAttemptPathDoesNotExist(committer, jContext); + + describe("Committer cleanup is disabled. hence it should not delete the task attempt path after commit"); + JobData jobData2 = startJob(true); + JobContext jContext2 = jobData2.getJContext(); + TaskAttemptContext tContext2 = jobData2.getTContext(); + AbstractS3ACommitter committer2 = jobData2.getCommitter(); + + committer2.getConf().setBoolean(FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED, false); + + + commit(committer2, jContext2, tContext2); + assertJobAttemptPathExists(committer2, jContext2); + } + + /** * The class provides a overridden implementation of commitJobInternal which * causes the commit failed for the first time then succeed. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org