Author: jing9 Date: Thu May 22 18:17:11 2014 New Revision: 1596931 URL: http://svn.apache.org/r1596931 Log: MAPREDUCE-5899. Support incremental data copy in DistCp. Contributed by Jing Zhao.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1596931&r1=1596930&r2=1596931&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Thu May 22 18:17:11 2014 @@ -116,7 +116,7 @@ public class Hdfs extends AbstractFileSy @Override public FileChecksum getFileChecksum(Path f) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getUriPath(f)); + return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE); } @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1596931&r1=1596930&r2=1596931&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu May 22 18:17:11 2014 @@ -1801,15 +1801,19 @@ public class DFSClient implements java.i } /** - * Get the checksum of a file. + * Get the checksum of the whole file of a range of the file. Note that the + * range always starts from the beginning of the file. * @param src The file path + * @param length The length of the range * @return The checksum * @see DistributedFileSystem#getFileChecksum(Path) */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + throws IOException { checkOpen(); - return getFileChecksum(src, clientName, namenode, socketFactory, - dfsClientConf.socketTimeout, getDataEncryptionKey(), + Preconditions.checkArgument(length >= 0); + return getFileChecksum(src, length, clientName, namenode, + socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(), dfsClientConf.connectToDnViaHostname); } @@ -1850,8 +1854,9 @@ public class DFSClient implements java.i } /** - * Get the checksum of a file. + * Get the checksum of the whole file or a range of the file. * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] * @param clientName the name of the client requesting the checksum. * @param namenode the RPC proxy for the namenode * @param socketFactory to create sockets to connect to DNs @@ -1861,12 +1866,13 @@ public class DFSClient implements java.i * @return The checksum */ private static MD5MD5CRC32FileChecksum getFileChecksum(String src, - String clientName, - ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, + long length, String clientName, ClientProtocol namenode, + SocketFactory socketFactory, int socketTimeout, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { - //get all block locations - LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + //get block locations for the file range + LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, + length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1878,10 +1884,11 @@ public class DFSClient implements java.i boolean refetchBlocks = false; int lastRetriedIndex = -1; - //get block checksum for each block - for(int i = 0; i < locatedblocks.size(); i++) { + // get block checksum for each block + long remaining = length; + for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { if (refetchBlocks) { // refetch to get fresh tokens - blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + blockLocations = callGetBlockLocations(namenode, src, 0, length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1890,6 +1897,10 @@ public class DFSClient implements java.i } LocatedBlock lb = locatedblocks.get(i); final ExtendedBlock block = lb.getBlock(); + if (remaining < block.getNumBytes()) { + block.setNumBytes(remaining); + } + remaining -= block.getNumBytes(); final DatanodeInfo[] datanodes = lb.getLocations(); //try each datanode location of the block Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1596931&r1=1596930&r2=1596931&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu May 22 18:17:11 2014 @@ -68,14 +68,12 @@ import org.apache.hadoop.hdfs.protocol.C import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -85,7 +83,6 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -1142,7 +1139,7 @@ public class DistributedFileSystem exten @Override public FileChecksum doCall(final Path p) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getPathName(p)); + return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); } @Override @@ -1154,6 +1151,32 @@ public class DistributedFileSystem exten } @Override + public FileChecksum getFileChecksum(Path f, final long length) + throws IOException { + statistics.incrementReadOps(1); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver<FileChecksum>() { + @Override + public FileChecksum doCall(final Path p) + throws IOException, UnresolvedLinkException { + return dfs.getFileChecksum(getPathName(p), length); + } + + @Override + public FileChecksum next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem) fs).getFileChecksum(p, length); + } else { + throw new UnsupportedFileSystemException( + "getFileChecksum(Path, long) is not supported by " + + fs.getClass().getSimpleName()); + } + } + }.resolve(this, absF); + } + + @Override public void setPermission(Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1596931&r1=1596930&r2=1596931&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu May 22 18:17:11 2014 @@ -42,6 +42,7 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; +import java.security.MessageDigest; import java.util.Arrays; import org.apache.commons.logging.Log; @@ -83,6 +84,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import com.google.common.base.Preconditions; import com.google.common.net.InetAddresses; import com.google.protobuf.ByteString; @@ -802,7 +804,44 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(out); } } - + + private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, + long requestLength, DataChecksum checksum, DataInputStream checksumIn) + throws IOException { + final int bytesPerCRC = checksum.getBytesPerChecksum(); + final int csize = checksum.getChecksumSize(); + final byte[] buffer = new byte[4*1024]; + MessageDigest digester = MD5Hash.getDigester(); + + long remaining = requestLength / bytesPerCRC * csize; + for (int toDigest = 0; remaining > 0; remaining -= toDigest) { + toDigest = checksumIn.read(buffer, 0, + (int) Math.min(remaining, buffer.length)); + if (toDigest < 0) { + break; + } + digester.update(buffer, 0, toDigest); + } + + int partialLength = (int) (requestLength % bytesPerCRC); + if (partialLength > 0) { + byte[] buf = new byte[partialLength]; + final InputStream blockIn = datanode.data.getBlockInputStream(block, + requestLength - partialLength); + try { + // Get the CRC of the partialLength. + IOUtils.readFully(blockIn, buf, 0, partialLength); + } finally { + IOUtils.closeStream(blockIn); + } + checksum.update(buf, 0, partialLength); + byte[] partialCrc = new byte[csize]; + checksum.writeValue(partialCrc, 0, true); + digester.update(partialCrc); + } + return new MD5Hash(digester.digest()); + } + @Override public void blockChecksum(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { @@ -810,25 +849,32 @@ class DataXceiver extends Receiver imple getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); - updateCurrentThreadName("Reading metadata for block " + block); - final LengthInputStream metadataIn = - datanode.data.getMetaDataInputStream(block); - final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( - metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + // client side now can specify a range of the block for checksum + long requestLength = block.getNumBytes(); + Preconditions.checkArgument(requestLength >= 0); + long visibleLength = datanode.data.getReplicaVisibleLength(block); + boolean partialBlk = requestLength < visibleLength; + updateCurrentThreadName("Reading metadata for block " + block); + final LengthInputStream metadataIn = datanode.data + .getMetaDataInputStream(block); + + final DataInputStream checksumIn = new DataInputStream( + new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file - final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - final DataChecksum checksum = header.getChecksum(); + final BlockMetadataHeader header = BlockMetadataHeader + .readHeader(checksumIn); + final DataChecksum checksum = header.getChecksum(); + final int csize = checksum.getChecksumSize(); final int bytesPerCRC = checksum.getBytesPerChecksum(); - final long crcPerBlock = checksum.getChecksumSize() > 0 - ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize() - : 0; - - //compute block checksum - final MD5Hash md5 = MD5Hash.digest(checksumIn); + final long crcPerBlock = csize <= 0 ? 0 : + (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize; + final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? + calcPartialBlockChecksum(block, requestLength, checksum, checksumIn) + : MD5Hash.digest(checksumIn); if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5); @@ -841,8 +887,7 @@ class DataXceiver extends Receiver imple .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelper.convert(checksum.getChecksumType())) - ) + .setCrcType(PBHelper.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1596931&r1=1596930&r2=1596931&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu May 22 18:17:11 2014 @@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods { MD5MD5CRC32FileChecksum checksum = null; DFSClient dfsclient = newDfsClient(nnId, conf); try { - checksum = dfsclient.getFileChecksum(fullpath); + checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE); dfsclient.close(); dfsclient = null; } finally { Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java?rev=1596931&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java Thu May 22 18:17:11 2014 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestGetFileChecksum { + private static final int BLOCKSIZE = 1024; + private static final short REPLICATION = 3; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION) + .build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + public void testGetFileChecksum(final Path foo, final int appendLength) + throws Exception { + final int appendRounds = 16; + FileChecksum[] fc = new FileChecksum[appendRounds + 1]; + DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L); + fc[0] = dfs.getFileChecksum(foo); + for (int i = 0; i < appendRounds; i++) { + DFSTestUtil.appendFile(dfs, foo, appendLength); + fc[i + 1] = dfs.getFileChecksum(foo); + } + + for (int i = 0; i < appendRounds + 1; i++) { + FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1)); + Assert.assertTrue(checksum.equals(fc[i])); + } + } + + @Test + public void testGetFileChecksum() throws Exception { + testGetFileChecksum(new Path("/foo"), BLOCKSIZE / 4); + testGetFileChecksum(new Path("/bar"), BLOCKSIZE / 4 - 1); + } +}