Repository: hadoop Updated Branches: refs/heads/trunk 33239c992 -> cd8b6889a
HDFS-9579. Provide bytes-read-by-network-distance metrics at FileSystem.Statistics level (Ming Ma via sjlee) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd8b6889 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd8b6889 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd8b6889 Branch: refs/heads/trunk Commit: cd8b6889a74a949e37f4b2eb664cdf3b59bfb93b Parents: 33239c9 Author: Sangjin Lee <sj...@apache.org> Authored: Sat Mar 19 14:02:04 2016 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Sat Mar 19 14:02:04 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/FileSystem.java | 118 ++++++++++++++++++- .../java/org/apache/hadoop/net/NetUtils.java | 16 ++- .../org/apache/hadoop/net/NetworkTopology.java | 17 ++- .../java/org/apache/hadoop/net/NodeBase.java | 18 ++- .../org/apache/hadoop/hdfs/BlockReader.java | 10 +- .../apache/hadoop/hdfs/BlockReaderFactory.java | 7 +- .../apache/hadoop/hdfs/BlockReaderLocal.java | 10 +- .../hadoop/hdfs/BlockReaderLocalLegacy.java | 10 +- .../org/apache/hadoop/hdfs/ClientContext.java | 56 ++++++++- .../java/org/apache/hadoop/hdfs/DFSClient.java | 11 +- .../org/apache/hadoop/hdfs/DFSInputStream.java | 14 +-- .../hadoop/hdfs/DFSStripedInputStream.java | 3 - .../apache/hadoop/hdfs/ExternalBlockReader.java | 10 +- .../apache/hadoop/hdfs/RemoteBlockReader.java | 29 ++--- .../apache/hadoop/hdfs/RemoteBlockReader2.java | 29 ++--- .../org/apache/hadoop/hdfs/ReplicaAccessor.java | 7 ++ .../erasurecode/ErasureCodingWorker.java | 3 +- .../hadoop/fs/TestEnhancedByteBufferAccess.java | 4 +- .../hadoop/hdfs/TestBlockReaderLocal.java | 4 +- .../org/apache/hadoop/hdfs/TestConnCache.java | 2 - .../hadoop/hdfs/TestDistributedFileSystem.java | 62 ++++++++++ .../hadoop/hdfs/TestExternalBlockReader.java | 8 +- .../apache/hadoop/net/TestNetworkTopology.java | 7 ++ 23 files changed, 368 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index a96ea40..a8a5c6d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -3023,11 +3023,15 @@ public abstract class FileSystem extends Configured implements Closeable { * need. */ public static class StatisticsData { - volatile long bytesRead; - volatile long bytesWritten; - volatile int readOps; - volatile int largeReadOps; - volatile int writeOps; + private volatile long bytesRead; + private volatile long bytesWritten; + private volatile int readOps; + private volatile int largeReadOps; + private volatile int writeOps; + private volatile long bytesReadLocalHost; + private volatile long bytesReadDistanceOfOneOrTwo; + private volatile long bytesReadDistanceOfThreeOrFour; + private volatile long bytesReadDistanceOfFiveOrLarger; /** * Add another StatisticsData object to this one. @@ -3038,6 +3042,12 @@ public abstract class FileSystem extends Configured implements Closeable { this.readOps += other.readOps; this.largeReadOps += other.largeReadOps; this.writeOps += other.writeOps; + this.bytesReadLocalHost += other.bytesReadLocalHost; + this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo; + this.bytesReadDistanceOfThreeOrFour += + other.bytesReadDistanceOfThreeOrFour; + this.bytesReadDistanceOfFiveOrLarger += + other.bytesReadDistanceOfFiveOrLarger; } /** @@ -3049,6 +3059,12 @@ public abstract class FileSystem extends Configured implements Closeable { this.readOps = -this.readOps; this.largeReadOps = -this.largeReadOps; this.writeOps = -this.writeOps; + this.bytesReadLocalHost = -this.bytesReadLocalHost; + this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo; + this.bytesReadDistanceOfThreeOrFour = + -this.bytesReadDistanceOfThreeOrFour; + this.bytesReadDistanceOfFiveOrLarger = + -this.bytesReadDistanceOfFiveOrLarger; } @Override @@ -3077,6 +3093,22 @@ public abstract class FileSystem extends Configured implements Closeable { public int getWriteOps() { return writeOps; } + + public long getBytesReadLocalHost() { + return bytesReadLocalHost; + } + + public long getBytesReadDistanceOfOneOrTwo() { + return bytesReadDistanceOfOneOrTwo; + } + + public long getBytesReadDistanceOfThreeOrFour() { + return bytesReadDistanceOfThreeOrFour; + } + + public long getBytesReadDistanceOfFiveOrLarger() { + return bytesReadDistanceOfFiveOrLarger; + } } private interface StatisticsAggregator<T> { @@ -3268,6 +3300,33 @@ public abstract class FileSystem extends Configured implements Closeable { } /** + * Increment the bytes read by the network distance in the statistics + * In the common network topology setup, distance value should be an even + * number such as 0, 2, 4, 6. To make it more general, we group distance + * by {1, 2}, {3, 4} and {5 and beyond} for accounting. + * @param distance the network distance + * @param newBytes the additional bytes read + */ + public void incrementBytesReadByDistance(int distance, long newBytes) { + switch (distance) { + case 0: + getThreadStatistics().bytesReadLocalHost += newBytes; + break; + case 1: + case 2: + getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes; + break; + case 3: + case 4: + getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes; + break; + default: + getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes; + break; + } + } + + /** * Apply the given aggregator to all StatisticsData objects associated with * this Statistics object. * @@ -3384,6 +3443,55 @@ public abstract class FileSystem extends Configured implements Closeable { }); } + /** + * In the common network topology setup, distance value should be an even + * number such as 0, 2, 4, 6. To make it more general, we group distance + * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller + * ask for bytes read for distance 2, the function will return the value + * for group {1, 2}. + * @param distance the network distance + * @return the total number of bytes read by the network distance + */ + public long getBytesReadByDistance(int distance) { + long bytesRead; + switch (distance) { + case 0: + bytesRead = getData().getBytesReadLocalHost(); + break; + case 1: + case 2: + bytesRead = getData().getBytesReadDistanceOfOneOrTwo(); + break; + case 3: + case 4: + bytesRead = getData().getBytesReadDistanceOfThreeOrFour(); + break; + default: + bytesRead = getData().getBytesReadDistanceOfFiveOrLarger(); + break; + } + return bytesRead; + } + + /** + * Get all statistics data + * MR or other frameworks can use the method to get all statistics at once. + * @return the StatisticsData + */ + public StatisticsData getData() { + return visitAll(new StatisticsAggregator<StatisticsData>() { + private StatisticsData all = new StatisticsData(); + + @Override + public void accept(StatisticsData data) { + all.add(data); + } + + public StatisticsData aggregate() { + return all; + } + }); + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index e475149..2c3661a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -638,13 +638,27 @@ public class NetUtils { /** * Return hostname without throwing exception. + * The returned hostname String format is "hostname". + * @return hostname + */ + public static String getLocalHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch(UnknownHostException uhe) { + return "" + uhe; + } + } + + /** + * Return hostname without throwing exception. + * The returned hostname String format is "hostname/ip address". * @return hostname */ public static String getHostname() { try {return "" + InetAddress.getLocalHost();} catch(UnknownHostException uhe) {return "" + uhe;} } - + /** * Compose a "host:port" string from the address. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index b637da1..e1d2968 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -369,6 +369,16 @@ public class NetworkTopology { int getNumOfLeaves() { return numOfLeaves; } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object to) { + return super.equals(to); + } } // end of InnerNode /** @@ -607,9 +617,14 @@ public class NetworkTopology { * or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster */ public int getDistance(Node node1, Node node2) { - if (node1 == node2) { + if ((node1 != null && node1.equals(node2)) || + (node1 == null && node2 == null)) { return 0; } + if (node1 == null || node2 == null) { + LOG.warn("One of the nodes is a null pointer"); + return Integer.MAX_VALUE; + } Node n1=node1, n2=node2; int dis = 0; netlock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java index b136297..b465098 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java @@ -112,7 +112,23 @@ public class NodeBase implements Node { public static String getPath(Node node) { return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName(); } - + + @Override + public boolean equals(Object to) { + if (this == to) { + return true; + } + if (!(to instanceof NodeBase)) { + return false; + } + return getPath(this).equals(getPath((NodeBase)to)); + } + + @Override + public int hashCode() { + return getPath(this).hashCode(); + } + /** @return this node's path as its string representation */ @Override public String toString() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 150cf23..63acaa7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -84,11 +84,6 @@ public interface BlockReader extends ByteBufferReadable, Closeable { int readAll(byte[] buf, int offset, int len) throws IOException; /** - * @return true only if this is a local read. - */ - boolean isLocal(); - - /** * @return true only if this is a short-circuit read. * All short-circuit reads are also local. */ @@ -107,4 +102,9 @@ public interface BlockReader extends ByteBufferReadable, Closeable { * @return The DataChecksum used by the read block */ DataChecksum getDataChecksum(); + + /** + * Return the network distance between local machine and the remote machine. + */ + int getNetworkDistance(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 5c7bbd7..8a0050f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -833,16 +833,19 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { @SuppressWarnings("deprecation") private BlockReader getRemoteBlockReader(Peer peer) throws IOException { + int networkDistance = clientContext.getNetworkDistance(datanode); if (conf.getShortCircuitConf().isUseLegacyBlockReader()) { return RemoteBlockReader.newBlockReader(fileName, block, token, startOffset, length, conf.getIoBufferSize(), verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy, tracer); + clientContext.getPeerCache(), cachingStrategy, tracer, + networkDistance); } else { return RemoteBlockReader2.newBlockReader( fileName, block, token, startOffset, length, verifyChecksum, clientName, peer, datanode, - clientContext.getPeerCache(), cachingStrategy, tracer); + clientContext.getPeerCache(), cachingStrategy, tracer, + networkDistance); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 859380c..68630c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -641,11 +641,6 @@ class BlockReaderLocal implements BlockReader { } @Override - public boolean isLocal() { - return true; - } - - @Override public boolean isShortCircuit() { return true; } @@ -721,4 +716,9 @@ class BlockReaderLocal implements BlockReader { public DataChecksum getDataChecksum() { return checksum; } + + @Override + public int getNetworkDistance() { + return 0; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 7206c07..65a8373 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -723,11 +723,6 @@ class BlockReaderLocalLegacy implements BlockReader { } @Override - public boolean isLocal() { - return true; - } - - @Override public boolean isShortCircuit() { return true; } @@ -741,4 +736,9 @@ class BlockReaderLocalLegacy implements BlockReader { public DataChecksum getDataChecksum() { return checksum; } + + @Override + public int getNetworkDistance() { + return 0; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 047645b..47d6d49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -17,16 +17,28 @@ */ package org.apache.hadoop.hdfs; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -101,7 +113,12 @@ public class ClientContext { */ private boolean printedConfWarning = false; - private ClientContext(String name, DfsClientConf conf) { + private final NetworkTopology topology; + private final NodeBase clientNode; + private final Map<NodeBase, Integer> nodeToDistance; + + private ClientContext(String name, DfsClientConf conf, + Configuration config) { final ShortCircuitConf scConf = conf.getShortCircuitConf(); this.name = name; @@ -116,14 +133,28 @@ public class ClientContext { this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); + + DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance( + config.getClass( + CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + ScriptBasedMapping.class, DNSToSwitchMapping.class), config); + List<String> nodes = new ArrayList<>(); + String clientHostName = NetUtils.getLocalHostname(); + nodes.add(clientHostName); + clientNode = new NodeBase(clientHostName, + dnsToSwitchMapping.resolve(nodes).get(0)); + this.topology = NetworkTopology.getInstance(config); + this.topology.add(clientNode); + this.nodeToDistance = new ConcurrentHashMap<>(); } - public static ClientContext get(String name, DfsClientConf conf) { + public static ClientContext get(String name, DfsClientConf conf, + Configuration config) { ClientContext context; synchronized(ClientContext.class) { context = CACHES.get(name); if (context == null) { - context = new ClientContext(name, conf); + context = new ClientContext(name, conf, config); CACHES.put(name, context); } else { context.printConfWarningIfNeeded(conf); @@ -132,6 +163,10 @@ public class ClientContext { return context; } + public static ClientContext get(String name, Configuration config) { + return get(name, new DfsClientConf(config), config); + } + /** * Get a client context, from a Configuration object. * @@ -141,8 +176,7 @@ public class ClientContext { @VisibleForTesting public static ClientContext getFromConf(Configuration conf) { return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, - HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), - new DfsClientConf(conf)); + HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), conf); } private void printConfWarningIfNeeded(DfsClientConf conf) { @@ -193,4 +227,16 @@ public class ClientContext { public ByteArrayManager getByteArrayManager() { return byteArrayManager; } + + public int getNetworkDistance(DatanodeInfo datanodeInfo) { + NodeBase node = new NodeBase(datanodeInfo.getHostName(), + datanodeInfo.getNetworkLocation()); + Integer distance = nodeToDistance.get(node); + if (distance == null) { + topology.add(node); + distance = topology.getDistance(clientNode, node); + nodeToDistance.put(node, distance); + } + return distance; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0976920..3506d3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -212,7 +212,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final String clientName; final SocketFactory socketFactory; final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; - final FileSystem.Statistics stats; + private final FileSystem.Statistics stats; private final String authority; private final Random r = new Random(); private SocketAddress[] localInterfaceAddrs; @@ -357,7 +357,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, new CachingStrategy(writeDropBehind, readahead); this.clientContext = ClientContext.get( conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT), - dfsClientConf); + dfsClientConf, conf); if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { this.initThreadsNumForHedgedReads(dfsClientConf. @@ -2740,6 +2740,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + void updateFileSystemReadStats(int distance, int nRead) { + if (stats != null) { + stats.incrementBytesRead(nRead); + stats.incrementBytesReadByDistance(distance, nRead); + } + } + /** * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if * it does not already exist. http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index d713e8f..7661e82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -775,7 +775,7 @@ public class DFSInputStream extends FSInputStream synchronized(infoLock) { if (blockReader.isShortCircuit()) { readStatistics.addShortCircuitBytes(nRead); - } else if (blockReader.isLocal()) { + } else if (blockReader.getNetworkDistance() == 0) { readStatistics.addLocalBytes(nRead); } else { readStatistics.addRemoteBytes(nRead); @@ -798,6 +798,8 @@ public class DFSInputStream extends FSInputStream throws IOException { int nRead = blockReader.read(buf, off, len); updateReadStatistics(readStatistics, nRead, blockReader); + dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), + nRead); return nRead; } @@ -828,6 +830,8 @@ public class DFSInputStream extends FSInputStream int ret = blockReader.read(buf); success = true; updateReadStatistics(readStatistics, ret, blockReader); + dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), + ret); if (ret == 0) { DFSClient.LOG.warn("zero"); } @@ -939,9 +943,6 @@ public class DFSInputStream extends FSInputStream // got a EOS from reader though we expect more data on it. throw new IOException("Unexpected EOS from the reader"); } - if (dfsClient.stats != null) { - dfsClient.stats.incrementBytesRead(result); - } return result; } catch (ChecksumException ce) { throw ce; @@ -1194,6 +1195,8 @@ public class DFSInputStream extends FSInputStream datanode.storageType, datanode.info); int nread = reader.readAll(buf, offset, len); updateReadStatistics(readStatistics, nread, reader); + dfsClient.updateFileSystemReadStats( + reader.getNetworkDistance(), nread); if (nread != len) { throw new IOException("truncated return from reader.read(): " + "excpected " + len + ", got " + nread); @@ -1479,9 +1482,6 @@ public class DFSInputStream extends FSInputStream offset += bytesToRead; } assert remaining == 0 : "Wrong number of bytes read."; - if (dfsClient.stats != null) { - dfsClient.stats.incrementBytesRead(realLen); - } return realLen; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index f6547f3..38236ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -447,9 +447,6 @@ public class DFSStripedInputStream extends DFSInputStream { result += ret; pos += ret; } - if (dfsClient.stats != null) { - dfsClient.stats.incrementBytesRead(result); - } return result; } finally { // Check if need to report block replicas corruption either read http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java index 42bec5c..707a56a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java @@ -110,11 +110,6 @@ public final class ExternalBlockReader implements BlockReader { } @Override - public boolean isLocal() { - return accessor.isLocal(); - } - - @Override public boolean isShortCircuit() { return accessor.isShortCircuit(); } @@ -129,4 +124,9 @@ public final class ExternalBlockReader implements BlockReader { public DataChecksum getDataChecksum() { return null; } + + @Override + public int getNetworkDistance() { + return accessor.getNetworkDistance(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 544e1b3..7e094f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.apache.htrace.core.TraceScope; @@ -93,11 +92,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { */ private final long bytesNeededToFinish; - /** - * True if we are reading from a local DataNode. - */ - private final boolean isLocal; - private boolean eos = false; private boolean sentStatusCode = false; @@ -109,6 +103,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private final Tracer tracer; + private final int networkDistance; + /* FSInputChecker interface */ /* same interface as inputStream java.io.InputStream#read() @@ -342,7 +338,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, + int networkDistance) { // Path is used only for printing block and file information in debug super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, @@ -351,9 +348,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { checksum.getBytesPerChecksum(), checksum.getChecksumSize()); - this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. - createSocketAddr(datanodeID.getXferAddr())); - this.peer = peer; this.datanodeID = datanodeID; this.in = in; @@ -375,6 +369,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { checksumSize = this.checksum.getChecksumSize(); this.peerCache = peerCache; this.tracer = tracer; + this.networkDistance = networkDistance; } /** @@ -400,7 +395,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy, - Tracer tracer) + Tracer tracer, int networkDistance) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = @@ -436,7 +431,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID, peerCache, tracer); + peer, datanodeID, peerCache, tracer, networkDistance); } @Override @@ -494,11 +489,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { } @Override - public boolean isLocal() { - return isLocal; - } - - @Override public boolean isShortCircuit() { return false; } @@ -512,4 +502,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public DataChecksum getDataChecksum() { return checksum; } + + @Override + public int getNetworkDistance() { + return networkDistance; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 22e4757..9437353 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.apache.htrace.core.TraceScope; @@ -116,17 +115,14 @@ public class RemoteBlockReader2 implements BlockReader { */ private long bytesNeededToFinish; - /** - * True if we are reading from a local DataNode. - */ - private final boolean isLocal; - private final boolean verifyChecksum; private boolean sentStatusCode = false; private final Tracer tracer; + private final int networkDistance; + @VisibleForTesting public Peer getPeer() { return peer; @@ -280,9 +276,8 @@ public class RemoteBlockReader2 implements BlockReader { protected RemoteBlockReader2(String file, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { - this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. - createSocketAddr(datanodeID.getXferAddr())); + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer, + int networkDistance) { // Path is used only for printing block and file information in debug this.peer = peer; this.datanodeID = datanodeID; @@ -302,6 +297,7 @@ public class RemoteBlockReader2 implements BlockReader { bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); this.tracer = tracer; + this.networkDistance = networkDistance; } @@ -397,7 +393,8 @@ public class RemoteBlockReader2 implements BlockReader { Peer peer, DatanodeID datanodeID, PeerCache peerCache, CachingStrategy cachingStrategy, - Tracer tracer) throws IOException { + Tracer tracer, + int networkDistance) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); @@ -430,7 +427,7 @@ public class RemoteBlockReader2 implements BlockReader { return new RemoteBlockReader2(file, block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID, - peerCache, tracer); + peerCache, tracer, networkDistance); } static void checkSuccess( @@ -454,11 +451,6 @@ public class RemoteBlockReader2 implements BlockReader { } @Override - public boolean isLocal() { - return isLocal; - } - - @Override public boolean isShortCircuit() { return false; } @@ -472,4 +464,9 @@ public class RemoteBlockReader2 implements BlockReader { public DataChecksum getDataChecksum() { return checksum; } + + @Override + public int getNetworkDistance() { + return networkDistance; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java index e0b21e8..556c2c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java @@ -87,4 +87,11 @@ public abstract class ReplicaAccessor { * short-circuit byte count statistics. */ public abstract boolean isShortCircuit(); + + /** + * Return the network distance between local machine and the remote machine. + */ + public int getNetworkDistance() { + return isLocal() ? 0 : Integer.MAX_VALUE; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index bde8d80..74fb3e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -849,12 +849,13 @@ public final class ErasureCodingWorker { * read directly from DN and need to check the replica is FINALIZED * state, notice we should not use short-circuit local read which * requires config for domain-socket in UNIX or legacy config in Windows. + * The network distance value isn't used for this scenario. */ return RemoteBlockReader2.newBlockReader( "dummy", block, blockToken, offsetInBlock, block.getNumBytes() - offsetInBlock, true, "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, - null, cachingStrategy, datanode.getTracer()); + null, cachingStrategy, datanode.getTracer(), -1); } catch (IOException e) { LOG.debug("Exception while creating remote block reader, datanode {}", dnInfo, e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 3455f55..a1af1fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess { fsIn.close(); fsIn = fs.open(TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, new DfsClientConf(conf)). getShortCircuitCache(); + CONTEXT, conf).getShortCircuitCache(); cache.accept(new CountingVisitor(0, 5, 5, 0)); results[0] = fsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); @@ -661,7 +661,7 @@ public class TestEnhancedByteBufferAccess { final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, new DfsClientConf(conf)). getShortCircuitCache(); + CONTEXT, conf).getShortCircuitCache(); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); // Uncache the replica fs.removeCacheDirective(directiveId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index 2d6c63a..0048d2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.util.Time; @@ -736,7 +737,8 @@ public class TestBlockReaderLocal { byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; FileSystem fs = null; try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster = new MiniDFSCluster.Builder(conf). + hosts(new String[] {NetUtils.getLocalHostname()}).build(); cluster.waitActive(); fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index afa5d27..8d2398d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -99,8 +99,6 @@ public class TestConnCache { DFSClient client = new DFSClient( new InetSocketAddress("localhost", util.getCluster().getNameNodePort()), util.getConf()); - ClientContext cacheContext = - ClientContext.get(contextName, client.getConf()); DFSInputStream in = client.open(testFile.toString()); LOG.info("opened " + testFile.toString()); byte[] dataBuf = new byte[BLOCK_SIZE]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 6217c45..1db0da8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; @@ -651,6 +652,67 @@ public class TestDistributedFileSystem { assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps()); } + /** Checks read statistics. */ + private void checkReadStatistics(FileSystem fs, int distance, long expectedReadBytes) { + long bytesRead = DFSTestUtil.getStatistics(fs). + getBytesReadByDistance(distance); + assertEquals(expectedReadBytes, bytesRead); + } + + @Test + public void testLocalHostReadStatistics() throws Exception { + testReadFileSystemStatistics(0); + } + + @Test + public void testLocalRackReadStatistics() throws Exception { + testReadFileSystemStatistics(2); + } + + @Test + public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception { + testReadFileSystemStatistics(4); + } + + /** expectedDistance is the expected distance between client and dn. + * 0 means local host. + * 2 means same rack. + * 4 means remote rack of first degree. + */ + private void testReadFileSystemStatistics(int expectedDistance) + throws IOException { + MiniDFSCluster cluster = null; + final Configuration conf = getTestConfiguration(); + + // create a cluster with a dn with the expected distance. + if (expectedDistance == 0) { + cluster = new MiniDFSCluster.Builder(conf). + hosts(new String[] {NetUtils.getLocalHostname()}).build(); + } else if (expectedDistance == 2) { + cluster = new MiniDFSCluster.Builder(conf). + hosts(new String[] {"hostFoo"}).build(); + } else if (expectedDistance == 4) { + cluster = new MiniDFSCluster.Builder(conf). + racks(new String[] {"/rackFoo"}).build(); + } + + // create a file, read the file and verify the metrics + try { + final FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.getStatistics(fs).reset(); + Path dir = new Path("/test"); + Path file = new Path(dir, "file"); + String input = "hello world"; + DFSTestUtil.writeFile(fs, file, input); + FSDataInputStream stm = fs.open(file); + byte[] actual = new byte[4096]; + stm.read(actual); + checkReadStatistics(fs, expectedDistance, input.length()); + } finally { + if (cluster != null) cluster.shutdown(); + } + } + @Test public void testFileChecksum() throws Exception { final long seed = RAN.nextLong(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java index 2c36baa..5c2b6da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; import org.junit.Assert; import org.junit.Test; @@ -246,6 +247,11 @@ public class TestExternalBlockReader { return true; } + @Override + public int getNetworkDistance() { + return 0; + } + synchronized String getError() { return error; } @@ -271,7 +277,7 @@ public class TestExternalBlockReader { String uuid = UUID.randomUUID().toString(); conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1) + .hosts(new String[] {NetUtils.getLocalHostname()}) .build(); final int TEST_LENGTH = 2047; DistributedFileSystem dfs = cluster.getFileSystem(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 45f6cb4..736230c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -129,6 +129,13 @@ public class TestNetworkTopology { assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2); assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 4); assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6); + // verify the distance is zero as long as two nodes have the same path. + // They don't need to refer to the same object. + NodeBase node1 = new NodeBase(dataNodes[0].getHostName(), + dataNodes[0].getNetworkLocation()); + NodeBase node2 = new NodeBase(dataNodes[0].getHostName(), + dataNodes[0].getNetworkLocation()); + assertEquals(0, cluster.getDistance(node1, node2)); } @Test