This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch branch-3.4.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4.2 by this push:
     new 5986095d03e HADOOP-18568. magic committer optional cleanup
5986095d03e is described below

commit 5986095d03ef53674a1a95b0149ae9ee6a675665
Author: Hossein Torabi <blck...@pm.me>
AuthorDate: Fri Jun 20 18:36:16 2025 +0000

    HADOOP-18568. magic committer optional cleanup
    
    Closes #7693
    
    Co-authored-by: Chris Nauroth <cnaur...@apache.org>
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
    (cherry picked from commit 7acb89cb1576963c7b3b5ee062ce7a2f80ae2ad6)
    (cherry picked from commit 43322a9e3b882774593c0baad034a87afdf0f604)
---
 .../hadoop/fs/s3a/commit/CommitConstants.java      | 12 ++++++++++
 .../s3a/commit/magic/MagicCommitTrackerUtils.java  | 12 ++++++++++
 .../fs/s3a/commit/magic/MagicS3GuardCommitter.java | 23 ++++++++++--------
 .../site/markdown/tools/hadoop-aws/committers.md   | 18 ++++++++++++++
 .../s3a/commit/magic/ITestMagicCommitProtocol.java | 28 ++++++++++++++++++++++
 5 files changed, 83 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 4f000550993..2e4cf6fc4a3 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -258,6 +258,18 @@ private CommitConstants() {
   public static final boolean 
FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
       false;
 
+  /**
+   * Should Magic committer cleanup all the staging dirs.
+   */
+  public static final String FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED =
+      "fs.s3a.committer.magic.cleanup.enabled";
+
+  /**
+   * Default value for {@link #FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED}: 
{@value}.
+   */
+  public static final boolean FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT =
+      true;
+
   /**
    * Path  in the cluster filesystem for temporary data: {@value}.
    * This is for HDFS, not the local filesystem.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
index 2ceac1c8e03..1db36dc5067 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java
@@ -61,4 +61,16 @@ public static boolean 
isTrackMagicCommitsInMemoryEnabled(Configuration conf) {
         CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
         
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
   }
+
+  /**
+   * Is cleanup of magic committer staging dirs enabled.
+   * @param conf Configuration
+   * @return true if cleanup of staging dir is enabled.
+   */
+  public static boolean isCleanupMagicCommitterEnabled(
+      Configuration conf) {
+    return conf.getBoolean(
+        CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED,
+        CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 5ed1a3abd46..ce8436d3921 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -50,6 +50,7 @@
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isCleanupMagicCommitterEnabled;
 import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 
@@ -131,16 +132,18 @@ protected ActiveCommit listPendingUploadsToCommit(
    * Delete the magic directory.
    */
   public void cleanupStagingDirs() {
-    final Path out = getOutputPath();
-    Path path = getMagicJobPath(getUUID(), out);
-    try(DurationInfo ignored = new DurationInfo(LOG, true,
-        "Deleting magic directory %s", path)) {
-      Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", 
path.toString(),
-          () -> deleteWithWarning(getDestFS(), path, true));
-      // and the job temp directory with manifests
-      Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
-          () -> deleteWithWarning(getDestFS(),
-              new Path(out, TEMP_DATA), true));
+    if (isCleanupMagicCommitterEnabled(getConf())) {
+      final Path out = getOutputPath();
+      Path path = getMagicJobPath(getUUID(), out);
+      try(DurationInfo ignored = new DurationInfo(LOG, true,
+              "Deleting magic directory %s", path)) {
+        Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", 
path.toString(),
+                () -> deleteWithWarning(getDestFS(), path, true));
+        // and the job temp directory with manifests
+        Invoker.ignoreIOExceptions(LOG, "cleanup job directory", 
path.toString(),
+                () -> deleteWithWarning(getDestFS(),
+                        new Path(out, TEMP_DATA), true));
+      }
     }
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md 
b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index 2dff5b79914..7d0e6c2bfaa 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -558,6 +558,7 @@ The table below provides a summary of each option.
 | `fs.s3a.committer.threads` | Number of threads in committers for parallel 
operations on files.| -4 |
 | `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed 
down from Spark | `false` |
 | `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from 
Spark | `false` |
+| `fs.s3a.committer.magic.cleanup.enabled` | Cleanup the magic path after the 
job is committed. | `true` |
 
 The examples below shows how these options can be configured in XML.
 
@@ -1058,3 +1059,20 @@ one of the following conditions are met
 1. The committer is being used in spark, and the version of spark being used 
does not
    set the `spark.sql.sources.writeJobUUID` property.
    Either upgrade to a new spark release, or set 
`fs.s3a.committer.generate.uuid` to true.
+
+### Long Job Completion Time Due to Magic Committer Cleanup
+When using the S3A Magic Committer in large Spark or MapReduce jobs, job 
completion can be significantly delayed
+due to the cleanup of temporary files (such as those under the `__magic` 
directory).
+This happens because deleting many small files in S3 is a slow and expensive 
operation, especially at scale.
+In some cases, the cleanup phase alone can take several minutes or more — even 
after all data has already been written.
+
+To reduce this overhead, Hadoop 3.4.2+ introduced a configuration option in
+[HADOOP-18568](https://issues.apache.org/jira/browse/HADOOP-18568) that allows 
users to disable this automatic cleanup
+and use lifecycle policies instead to clean up the temporary files.
+#### Configuration
+```xml
+<property>
+  <name>fs.s3a.committer.magic.cleanup.enabled</name>
+  <value>false</value>
+</property>
+```
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index cbfc23a2a29..f848c67a13b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -213,6 +213,34 @@ public void testCommittersPathsHaveUUID() throws Throwable 
{
         .contains(ta0);
   }
 
+  /**
+   * Verify that the magic committer cleanup
+   */
+  @Test
+  public void testCommitterCleanup() throws Throwable {
+    describe("Committer cleanup enabled. hence it should delete the task 
attempt path after commit");
+    JobData jobData = startJob(true);
+    JobContext jContext = jobData.getJContext();
+    TaskAttemptContext tContext = jobData.getTContext();
+    AbstractS3ACommitter committer = jobData.getCommitter();
+
+    commit(committer, jContext, tContext);
+    assertJobAttemptPathDoesNotExist(committer, jContext);
+
+    describe("Committer cleanup is disabled. hence it should not delete the 
task attempt path after commit");
+    JobData jobData2 = startJob(true);
+    JobContext jContext2 = jobData2.getJContext();
+    TaskAttemptContext tContext2 = jobData2.getTContext();
+    AbstractS3ACommitter committer2 = jobData2.getCommitter();
+
+    committer2.getConf().setBoolean(FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED, 
false);
+
+
+    commit(committer2, jContext2, tContext2);
+    assertJobAttemptPathExists(committer2, jContext2);
+  }
+
+
   /**
    * The class provides a overridden implementation of commitJobInternal which
    * causes the commit failed for the first time then succeed.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to