This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5cd006455d3 HADOOP-18582. skip unnecessary cleanup logic in distcp 
(#5251)
5cd006455d3 is described below

commit 5cd006455d3318f186f5d57df1f3e8209664b1d7
Author: kevin wan <610379...@qq.com>
AuthorDate: Wed Jan 25 07:49:32 2023 +0800

    HADOOP-18582. skip unnecessary cleanup logic in distcp (#5251)
    
    Co-authored-by: 万康 <min...@xiaohongshu.com>
    Reviewed-by: Steve Loughran <ste...@apache.org>
    Signed-off-by: Ayush Saxena <ayushsax...@apache.org>
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
    (cherry picked from commit 3b7b79b37ae1045e413de309789fbb400817a081)
---
 .../apache/hadoop/tools/mapred/CopyCommitter.java  | 13 +++-
 .../hadoop/tools/mapred/TestCopyCommitter.java     | 71 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 2272781f724..e5c74094e90 100644
--- 
a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ 
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -149,9 +149,18 @@ public class CopyCommitter extends FileOutputCommitter {
   }
 
   private void cleanupTempFiles(JobContext context) {
-    try {
-      Configuration conf = context.getConfiguration();
+    Configuration conf = context.getConfiguration();
+
+    final boolean directWrite = conf.getBoolean(
+        DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
+    final boolean append = conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false);
+    final boolean useTempTarget = !append && !directWrite;
+    if (!useTempTarget) {
+      return;
+    }
 
+    try {
       Path targetWorkPath = new 
Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
       FileSystem targetFS = targetWorkPath.getFileSystem(conf);
 
diff --git 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 599f3ec2db6..bda80a3d25e 100644
--- 
a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ 
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools.mapred;
 
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -580,6 +581,76 @@ public class TestCopyCommitter {
     }
   }
 
+  @Test
+  public void testCommitWithCleanupTempFiles() throws IOException {
+    testCommitWithCleanup(true, false);
+    testCommitWithCleanup(false, true);
+    testCommitWithCleanup(true, true);
+    testCommitWithCleanup(false, false);
+  }
+
+  private void testCommitWithCleanup(boolean append, boolean 
directWrite)throws IOException {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID();
+    JobContext jobContext = new JobContextImpl(
+        taskAttemptContext.getConfiguration(),
+        jobID);
+    Configuration conf = jobContext.getConfiguration();
+
+    String sourceBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(conf);
+      sourceBase = "/tmp1/" + rand.nextLong();
+      targetBase = "/tmp1/" + rand.nextLong();
+
+      DistCpOptions options = new DistCpOptions.Builder(
+          Collections.singletonList(new Path(sourceBase)),
+          new Path("/out"))
+          .withAppend(append)
+          .withSyncFolder(true)
+          .withDirectWrite(directWrite)
+          .build();
+      options.appendToConf(conf);
+
+      DistCpContext context = new DistCpContext(options);
+      context.setTargetPathExists(false);
+
+
+      conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+      Path tempFilePath = getTempFile(targetBase, taskAttemptContext);
+      createDirectory(fs, tempFilePath);
+
+      OutputCommitter committer = new CopyCommitter(
+          null, taskAttemptContext);
+      committer.commitJob(jobContext);
+
+      if (append || directWrite) {
+        ContractTestUtils.assertPathExists(fs, "Temp files should not be 
cleanup with append or direct option",
+            tempFilePath);
+      } else {
+        ContractTestUtils.assertPathDoesNotExist(
+            fs,
+            "Temp files should be clean up without append or direct option",
+            tempFilePath);
+      }
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+      TestDistCpUtils.delete(fs, "/meta");
+    }
+  }
+
+  private Path getTempFile(String targetWorkPath, TaskAttemptContext 
taskAttemptContext) {
+    Path tempFile = new Path(targetWorkPath, ".distcp.tmp." +
+        taskAttemptContext.getTaskAttemptID().toString() +
+        "." + System.currentTimeMillis());
+    LOG.info("Creating temp file: {}", tempFile);
+    return tempFile;
+  }
+
   /**
    * Create a source file and its DistCp working files with different checksum
    * to test the checksum validation for copying blocks in parallel.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to