Author: kihwal Date: Tue Apr 16 22:07:14 2013 New Revision: 1468636 URL: http://svn.apache.org/r1468636 Log: MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes are different on source/target. Contributed by Mithun Radhakrishnan.
Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1468636&r1=1468635&r2=1468636&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java Tue Apr 16 22:07:14 2013 @@ -255,7 +255,7 @@ public class CopyMapper extends Mapper<T long bytesCopied; try { - bytesCopied = (Long)new RetriableFileCopyCommand(description) + bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description) .execute(sourceFileStatus, target, context, fileAttributes); } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1468636&r1=1468635&r2=1468636&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java Tue Apr 16 22:07:14 2013 @@ -41,6 +41,7 @@ public class RetriableFileCopyCommand ex private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class); private static int BUFFER_SIZE = 8 * 1024; + private boolean skipCrc = false; /** * Constructor, taking a description of the action. @@ -51,6 +52,17 @@ public class RetriableFileCopyCommand ex } /** + * Create a RetriableFileCopyCommand. + * + * @param skipCrc Whether to skip the crc check. + * @param description A verbose description of the copy operation. + */ + public RetriableFileCopyCommand(boolean skipCrc, String description) { + this(description); + this.skipCrc = skipCrc; + } + + /** * Implementation of RetriableCommand::doExecute(). * This is the actual copy-implementation. * @param arguments Argument-list to the command. @@ -92,7 +104,7 @@ public class RetriableFileCopyCommand ex compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead); //At this point, src&dest lengths are same. if length==0, we skip checksum - if (bytesRead != 0) { + if (bytesRead != 0 && !skipCrc) { compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath); } promoteTmpToTarget(tmpTargetPath, target, targetFS); @@ -128,10 +140,17 @@ public class RetriableFileCopyCommand ex private void compareCheckSums(FileSystem sourceFS, Path source, FileSystem targetFS, Path target) throws IOException { - if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) - throw new IOException("Check-sum mismatch between " - + source + " and " + target); - + if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) { + StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ") + .append(source).append(" and ").append(target).append("."); + if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) { + errorMessage.append(" Source and target differ in block-size.") + .append(" Use -pb to preserve block-sizes during copy.") + .append(" Alternatively, skip checksum-checks altogether, using -skipCrc.") + .append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)"); + } + throw new IOException(errorMessage.toString()); + } } //If target file exists and unable to delete target - fail Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1468636&r1=1468635&r2=1468636&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Tue Apr 16 22:07:14 2013 @@ -21,6 +21,7 @@ package org.apache.hadoop.tools.mapred; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -53,7 +54,7 @@ public class TestCopyMapper { private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static List<Path> pathList = new ArrayList<Path>(); private static int nFiles = 0; - private static final int FILE_SIZE = 1024; + private static final int DEFAULT_FILE_SIZE = 1024; private static MiniDFSCluster cluster; @@ -92,7 +93,7 @@ public class TestCopyMapper { configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false); configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), - true); + false); configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), true); configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), @@ -112,6 +113,18 @@ public class TestCopyMapper { touchFile(SOURCE_PATH + "/7/8/9"); } + private static void createSourceDataWithDifferentBlockSize() throws Exception { + mkdirs(SOURCE_PATH + "/1"); + mkdirs(SOURCE_PATH + "/2"); + mkdirs(SOURCE_PATH + "/2/3/4"); + mkdirs(SOURCE_PATH + "/2/3"); + mkdirs(SOURCE_PATH + "/5"); + touchFile(SOURCE_PATH + "/5/6", true); + mkdirs(SOURCE_PATH + "/7"); + mkdirs(SOURCE_PATH + "/7/8"); + touchFile(SOURCE_PATH + "/7/8/9"); + } + private static void mkdirs(String path) throws Exception { FileSystem fileSystem = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), @@ -121,17 +134,31 @@ public class TestCopyMapper { } private static void touchFile(String path) throws Exception { + touchFile(path, false); + } + + private static void touchFile(String path, boolean createMultipleBlocks) throws Exception { + final long NON_DEFAULT_BLOCK_SIZE = 4096; FileSystem fs; DataOutputStream outputStream = null; try { fs = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), fs.getWorkingDirectory()); - final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2; + final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2; outputStream = fs.create(qualifiedPath, true, 0, (short)(fs.getDefaultReplication(qualifiedPath)*2), blockSize); - outputStream.write(new byte[FILE_SIZE]); + byte[] bytes = new byte[DEFAULT_FILE_SIZE]; + outputStream.write(bytes); + long fileSize = DEFAULT_FILE_SIZE; + if (createMultipleBlocks) { + while (fileSize < 2*blockSize) { + outputStream.write(bytes); + outputStream.flush(); + fileSize += DEFAULT_FILE_SIZE; + } + } pathList.add(qualifiedPath); ++nFiles; @@ -144,7 +171,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testRun() { try { deleteState(); @@ -179,7 +206,7 @@ public class TestCopyMapper { Assert.assertEquals(pathList.size(), stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue()); - Assert.assertEquals(nFiles * FILE_SIZE, + Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); testCopyingExistingFiles(fs, copyMapper, context); @@ -211,7 +238,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testMakeDirFailure() { try { deleteState(); @@ -239,13 +266,13 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testIgnoreFailures() { doTestIgnoreFailures(true); doTestIgnoreFailures(false); } - @Test + @Test(timeout=40000) public void testDirToFile() { try { deleteState(); @@ -273,7 +300,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testPreserve() { try { deleteState(); @@ -343,7 +370,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testCopyReadableFiles() { try { deleteState(); @@ -406,7 +433,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testSkipCopyNoPerms() { try { deleteState(); @@ -480,7 +507,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testFailCopyWithAccessControlException() { try { deleteState(); @@ -558,7 +585,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testFileToDir() { try { deleteState(); @@ -635,12 +662,48 @@ public class TestCopyMapper { cluster.getFileSystem().delete(new Path(TARGET_PATH), true); } - @Test + @Test(timeout=40000) public void testPreserveBlockSizeAndReplication() { testPreserveBlockSizeAndReplicationImpl(true); testPreserveBlockSizeAndReplicationImpl(false); } + @Test(timeout=40000) + public void testCopyFailOnBlockSizeDifference() { + try { + + deleteState(); + createSourceDataWithDifferentBlockSize(); + + FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper<Text, FileStatus, Text, Text>.Context context + = stubContext.getContext(); + + Configuration configuration = context.getConfiguration(); + EnumSet<DistCpOptions.FileAttribute> fileAttributes + = EnumSet.noneOf(DistCpOptions.FileAttribute.class); + configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), + DistCpUtils.packAttributes(fileAttributes)); + + copyMapper.setup(context); + + for (Path path : pathList) { + final FileStatus fileStatus = fs.getFileStatus(path); + copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + fileStatus, context); + } + + Assert.fail("Copy should have failed because of block-size difference."); + } + catch (Exception exception) { + // Check that the exception suggests the use of -pb/-skipCrc. + Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getMessage().contains("pb")); + Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getMessage().contains("skipCrc")); + } + } + private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){ try { @@ -712,7 +775,7 @@ public class TestCopyMapper { * If a single file is being copied to a location where the file (of the same * name) already exists, then the file shouldn't be skipped. */ - @Test + @Test(timeout=40000) public void testSingleFileCopy() { try { deleteState(); @@ -761,7 +824,7 @@ public class TestCopyMapper { } } - @Test + @Test(timeout=40000) public void testPreserveUserGroup() { testPreserveUserGroupImpl(true); testPreserveUserGroupImpl(false);