This is an automated email from the ASF dual-hosted git repository. cnauroth pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5cd006455d3 HADOOP-18582. skip unnecessary cleanup logic in distcp (#5251) 5cd006455d3 is described below commit 5cd006455d3318f186f5d57df1f3e8209664b1d7 Author: kevin wan <610379...@qq.com> AuthorDate: Wed Jan 25 07:49:32 2023 +0800 HADOOP-18582. skip unnecessary cleanup logic in distcp (#5251) Co-authored-by: 万康 <min...@xiaohongshu.com> Reviewed-by: Steve Loughran <ste...@apache.org> Signed-off-by: Ayush Saxena <ayushsax...@apache.org> Signed-off-by: Chris Nauroth <cnaur...@apache.org> (cherry picked from commit 3b7b79b37ae1045e413de309789fbb400817a081) --- .../apache/hadoop/tools/mapred/CopyCommitter.java | 13 +++- .../hadoop/tools/mapred/TestCopyCommitter.java | 71 ++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 2272781f724..e5c74094e90 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -149,9 +149,18 @@ public class CopyCommitter extends FileOutputCommitter { } private void cleanupTempFiles(JobContext context) { - try { - Configuration conf = context.getConfiguration(); + Configuration conf = context.getConfiguration(); + + final boolean directWrite = conf.getBoolean( + DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); + final boolean append = conf.getBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), false); + final boolean useTempTarget = !append && !directWrite; + if (!useTempTarget) { + return; + } + try { Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); FileSystem targetFS = targetWorkPath.getFileSystem(conf); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 599f3ec2db6..bda80a3d25e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.mapred; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -580,6 +581,76 @@ public class TestCopyCommitter { } } + @Test + public void testCommitWithCleanupTempFiles() throws IOException { + testCommitWithCleanup(true, false); + testCommitWithCleanup(false, true); + testCommitWithCleanup(true, true); + testCommitWithCleanup(false, false); + } + + private void testCommitWithCleanup(boolean append, boolean directWrite)throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID(); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + jobID); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + sourceBase = "/tmp1/" + rand.nextLong(); + targetBase = "/tmp1/" + rand.nextLong(); + + DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), + new Path("/out")) + .withAppend(append) + .withSyncFolder(true) + .withDirectWrite(directWrite) + .build(); + options.appendToConf(conf); + + DistCpContext context = new DistCpContext(options); + context.setTargetPathExists(false); + + + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + Path tempFilePath = getTempFile(targetBase, taskAttemptContext); + createDirectory(fs, tempFilePath); + + OutputCommitter committer = new CopyCommitter( + null, taskAttemptContext); + committer.commitJob(jobContext); + + if (append || directWrite) { + ContractTestUtils.assertPathExists(fs, "Temp files should not be cleanup with append or direct option", + tempFilePath); + } else { + ContractTestUtils.assertPathDoesNotExist( + fs, + "Temp files should be clean up without append or direct option", + tempFilePath); + } + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + TestDistCpUtils.delete(fs, "/meta"); + } + } + + private Path getTempFile(String targetWorkPath, TaskAttemptContext taskAttemptContext) { + Path tempFile = new Path(targetWorkPath, ".distcp.tmp." + + taskAttemptContext.getTaskAttemptID().toString() + + "." + System.currentTimeMillis()); + LOG.info("Creating temp file: {}", tempFile); + return tempFile; + } + /** * Create a source file and its DistCp working files with different checksum * to test the checksum validation for copying blocks in parallel. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org