Author: jing9
Date: Mon Aug 18 17:51:18 2014
New Revision: 1618675
URL: http://svn.apache.org/r1618675
Log:
HDFS-6801. Archival Storage: Add a new data migration tool. Contributed by Tsz
Wo Nicholas Sze.
Added:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java?rev=1618675&r1=1618674&r2=1618675&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
Mon Aug 18 17:51:18 2014
@@ -180,7 +180,23 @@ public class BlockStoragePolicy {
public StorageType getReplicationFallback(EnumSet<StorageType> unavailables)
{
return getFallback(unavailables, replicationFallbacks);
}
-
+
+ @Override
+ public int hashCode() {
+ return Byte.valueOf(id).hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (obj == null || !(obj instanceof BlockStoragePolicy)) {
+ return false;
+ }
+ final BlockStoragePolicy that = (BlockStoragePolicy)obj;
+ return this.id == that.id;
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + "{" + name + ":" + id
@@ -193,6 +209,10 @@ public class BlockStoragePolicy {
return id;
}
+ public String getName() {
+ return name;
+ }
+
private static StorageType getFallback(EnumSet<StorageType> unavailables,
StorageType[] fallbacks) {
for(StorageType fb : fallbacks) {
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1618675&r1=1618674&r2=1618675&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Mon Aug 18 17:51:18 2014
@@ -362,6 +362,12 @@ public class DFSConfigKeys extends Commo
public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY =
"dfs.balancer.dispatcherThreads";
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
+
+ public static final String DFS_MOVER_MOVEDWINWIDTH_KEY =
"dfs.mover.movedWinWidth";
+ public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
+ public static final String DFS_MOVER_MOVERTHREADS_KEY =
"dfs.mover.moverThreads";
+ public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
+
public static final String DFS_DATANODE_ADDRESS_KEY =
"dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 50010;
public static final String DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:" +
DFS_DATANODE_DEFAULT_PORT;
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1618675&r1=1618674&r2=1618675&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Mon Aug 18 17:51:18 2014
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.text.DateFormat;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -54,6 +53,7 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -270,7 +270,7 @@ public class Balancer {
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
- final DDatanode dn = dispatcher.newDatanode(r);
+ final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
for(StorageType t : StorageType.asList()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
@@ -294,7 +294,7 @@ public class Balancer {
}
g = s;
} else {
- g = dn.addStorageGroup(t, maxSize2Move);
+ g = dn.addTarget(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
} else {
@@ -546,15 +546,10 @@ public class Balancer {
final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already
Moved Bytes Left To Move Bytes Being Moved");
- final List<NameNodeConnector> connectors
- = new ArrayList<NameNodeConnector>(namenodes.size());
+ List<NameNodeConnector> connectors = Collections.emptyList();
try {
- for (URI uri : namenodes) {
- final NameNodeConnector nnc = new NameNodeConnector(
- Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
- nnc.getKeyManager().startBlockKeyUpdater();
- connectors.add(nnc);
- }
+ connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
+ Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf);
boolean done = false;
for(int iteration = 0; !done; iteration++) {
@@ -579,7 +574,7 @@ public class Balancer {
}
} finally {
for(NameNodeConnector nnc : connectors) {
- nnc.close();
+ IOUtils.cleanup(LOG, nnc);
}
}
return ExitStatus.SUCCESS.getExitCode();
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1618675&r1=1618674&r2=1618675&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
Mon Aug 18 17:51:18 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,7 +104,8 @@ public class Dispatcher {
private final MovedBlocks<StorageGroup> movedBlocks;
/** Map (datanodeUuid,storageType -> StorageGroup) */
- private final StorageGroupMap storageGroupMap = new StorageGroupMap();
+ private final StorageGroupMap<StorageGroup> storageGroupMap
+ = new StorageGroupMap<StorageGroup>();
private NetworkTopology cluster;
@@ -140,18 +142,18 @@ public class Dispatcher {
}
}
- static class StorageGroupMap {
+ public static class StorageGroupMap<G extends StorageGroup> {
private static String toKey(String datanodeUuid, StorageType storageType) {
return datanodeUuid + ":" + storageType;
}
- private final Map<String, StorageGroup> map = new HashMap<String,
StorageGroup>();
+ private final Map<String, G> map = new HashMap<String, G>();
- StorageGroup get(String datanodeUuid, StorageType storageType) {
+ public G get(String datanodeUuid, StorageType storageType) {
return map.get(toKey(datanodeUuid, storageType));
}
- void put(StorageGroup g) {
+ public void put(G g) {
final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(),
g.storageType);
final StorageGroup existing = map.put(key, g);
Preconditions.checkState(existing == null);
@@ -167,7 +169,7 @@ public class Dispatcher {
}
/** This class keeps track of a scheduled block move */
- private class PendingMove {
+ public class PendingMove {
private DBlock block;
private Source source;
private DDatanode proxySource;
@@ -176,6 +178,12 @@ public class Dispatcher {
private PendingMove() {
}
+ public PendingMove(DBlock block, Source source, StorageGroup target) {
+ this.block = block;
+ this.source = source;
+ this.target = target;
+ }
+
@Override
public String toString() {
final Block b = block.getBlock();
@@ -227,7 +235,7 @@ public class Dispatcher {
*
* @return true if a proxy is found; otherwise false
*/
- private boolean chooseProxySource() {
+ public boolean chooseProxySource() {
final DatanodeInfo targetDN = target.getDatanodeInfo();
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
@@ -356,8 +364,8 @@ public class Dispatcher {
}
/** A class for keeping track of block locations in the dispatcher. */
- private static class DBlock extends MovedBlocks.Locations<StorageGroup> {
- DBlock(Block block) {
+ public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
+ public DBlock(Block block) {
super(block);
}
}
@@ -378,10 +386,10 @@ public class Dispatcher {
}
/** A class that keeps track of a datanode. */
- static class DDatanode {
+ public static class DDatanode {
/** A group of storages in a datanode with the same storage type. */
- class StorageGroup {
+ public class StorageGroup {
final StorageType storageType;
final long maxSize2Move;
private long scheduledSize = 0L;
@@ -390,18 +398,26 @@ public class Dispatcher {
this.storageType = storageType;
this.maxSize2Move = maxSize2Move;
}
+
+ public StorageType getStorageType() {
+ return storageType;
+ }
private DDatanode getDDatanode() {
return DDatanode.this;
}
- DatanodeInfo getDatanodeInfo() {
+ public DatanodeInfo getDatanodeInfo() {
return DDatanode.this.datanode;
}
/** Decide if still need to move more bytes */
- synchronized boolean hasSpaceForScheduling() {
- return availableSizeToMove() > 0L;
+ boolean hasSpaceForScheduling() {
+ return hasSpaceForScheduling(0L);
+ }
+
+ synchronized boolean hasSpaceForScheduling(long size) {
+ return availableSizeToMove() > size;
}
/** @return the total number of bytes that need to be moved */
@@ -410,7 +426,7 @@ public class Dispatcher {
}
/** increment scheduled size */
- synchronized void incScheduledSize(long size) {
+ public synchronized void incScheduledSize(long size) {
scheduledSize += size;
}
@@ -436,7 +452,9 @@ public class Dispatcher {
}
final DatanodeInfo datanode;
- final EnumMap<StorageType, StorageGroup> storageMap
+ private final EnumMap<StorageType, Source> sourceMap
+ = new EnumMap<StorageType, Source>(StorageType.class);
+ private final EnumMap<StorageType, StorageGroup> targetMap
= new EnumMap<StorageType, StorageGroup>(StorageType.class);
protected long delayUntil = 0L;
/** blocks being moved but not confirmed yet */
@@ -445,29 +463,34 @@ public class Dispatcher {
@Override
public String toString() {
- return getClass().getSimpleName() + ":" + datanode + ":" +
storageMap.values();
+ return getClass().getSimpleName() + ":" + datanode;
}
- private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
- this.datanode = r.getDatanodeInfo();
+ private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
+ this.datanode = datanode;
this.maxConcurrentMoves = maxConcurrentMoves;
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
}
- private void put(StorageType storageType, StorageGroup g) {
- final StorageGroup existing = storageMap.put(storageType, g);
+ public DatanodeInfo getDatanodeInfo() {
+ return datanode;
+ }
+
+ private static <G extends StorageGroup> void put(StorageType storageType,
+ G g, EnumMap<StorageType, G> map) {
+ final StorageGroup existing = map.put(storageType, g);
Preconditions.checkState(existing == null);
}
- StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) {
+ public StorageGroup addTarget(StorageType storageType, long maxSize2Move) {
final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
- put(storageType, g);
+ put(storageType, g, targetMap);
return g;
}
- Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d)
{
+ public Source addSource(StorageType storageType, long maxSize2Move,
Dispatcher d) {
final Source s = d.new Source(storageType, maxSize2Move, this);
- put(storageType, s);
+ put(storageType, s, sourceMap);
return s;
}
@@ -508,7 +531,7 @@ public class Dispatcher {
}
/** A node that can be the sources of a block move */
- class Source extends DDatanode.StorageGroup {
+ public class Source extends DDatanode.StorageGroup {
private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L;
@@ -654,13 +677,7 @@ public class Dispatcher {
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
final PendingMove p = chooseNextMove();
if (p != null) {
- // move the block
- moveExecutor.execute(new Runnable() {
- @Override
- public void run() {
- p.dispatch();
- }
- });
+ executePendingMove(p);
continue;
}
@@ -716,7 +733,8 @@ public class Dispatcher {
this.cluster = NetworkTopology.getInstance(conf);
this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
- this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads);
+ this.dispatchExecutor = dispatcherThreads == 0? null
+ : Executors.newFixedThreadPool(dispatcherThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
@@ -727,11 +745,15 @@ public class Dispatcher {
TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
}
- StorageGroupMap getStorageGroupMap() {
+ public DistributedFileSystem getDistributedFileSystem() {
+ return nnc.getDistributedFileSystem();
+ }
+
+ public StorageGroupMap<StorageGroup> getStorageGroupMap() {
return storageGroupMap;
}
- NetworkTopology getCluster() {
+ public NetworkTopology getCluster() {
return cluster;
}
@@ -779,7 +801,7 @@ public class Dispatcher {
}
/** Get live datanode storage reports and then build the network topology. */
- List<DatanodeStorageReport> init() throws IOException {
+ public List<DatanodeStorageReport> init() throws IOException {
final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
final List<DatanodeStorageReport> trimmed = new
ArrayList<DatanodeStorageReport>();
// create network topology and classify utilization collections:
@@ -795,8 +817,18 @@ public class Dispatcher {
return trimmed;
}
- public DDatanode newDatanode(DatanodeStorageReport r) {
- return new DDatanode(r, maxConcurrentMovesPerNode);
+ public DDatanode newDatanode(DatanodeInfo datanode) {
+ return new DDatanode(datanode, maxConcurrentMovesPerNode);
+ }
+
+ public void executePendingMove(final PendingMove p) {
+ // move the block
+ moveExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ p.dispatch();
+ }
+ });
}
public boolean dispatchAndCheckContinue() throws InterruptedException {
@@ -869,6 +901,12 @@ public class Dispatcher {
}
}
+ private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup
target,
+ DBlock block) {
+ // match source and target storage type
+ return isGoodBlockCandidate(source, target, source.getStorageType(),
block);
+ }
+
/**
* Decide if the block is a good candidate to be moved from source to target.
* A block is a good candidate if
@@ -876,9 +914,12 @@ public class Dispatcher {
* 2. the block does not have a replica on the target;
* 3. doing the move does not reduce the number of racks that the block has
*/
- private boolean isGoodBlockCandidate(Source source, StorageGroup target,
- DBlock block) {
- if (source.storageType != target.storageType) {
+ public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
+ StorageType targetStorageType, DBlock block) {
+ if (target.storageType != targetStorageType) {
+ return false;
+ }
+ if (!target.hasSpaceForScheduling(block.getNumBytes())) {
return false;
}
// check if the block is moved or not
@@ -889,7 +930,7 @@ public class Dispatcher {
return false;
}
if (cluster.isNodeGroupAware()
- && isOnSameNodeGroupWithReplicas(target, block, source)) {
+ && isOnSameNodeGroupWithReplicas(source, target, block)) {
return false;
}
if (reduceNumOfRacks(source, target, block)) {
@@ -902,7 +943,7 @@ public class Dispatcher {
* Determine whether moving the given block replica from source to target
* would reduce the number of racks of the block replicas.
*/
- private boolean reduceNumOfRacks(Source source, StorageGroup target,
+ private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
DBlock block) {
final DatanodeInfo sourceDn = source.getDatanodeInfo();
if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
@@ -939,8 +980,8 @@ public class Dispatcher {
* @return true if there are any replica (other than source) on the same node
* group with target
*/
- private boolean isOnSameNodeGroupWithReplicas(
- StorageGroup target, DBlock block, Source source) {
+ private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
+ StorageGroup target, DBlock block) {
final DatanodeInfo targetDn = target.getDatanodeInfo();
for (StorageGroup g : block.getLocations()) {
if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(),
targetDn)) {
@@ -961,7 +1002,7 @@ public class Dispatcher {
}
/** shutdown thread pools */
- void shutdownNow() {
+ public void shutdownNow() {
dispatchExecutor.shutdownNow();
moveExecutor.shutdownNow();
}
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1618675&r1=1618674&r2=1618675&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
(original)
+++
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
Mon Aug 18 17:51:18 2014
@@ -23,6 +23,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -51,6 +55,20 @@ public class NameNodeConnector implement
private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
+
+ /** Create {@link NameNodeConnector} for the given namenodes. */
+ public static List<NameNodeConnector> newNameNodeConnectors(
+ Collection<URI> namenodes, String name, Path idPath, Configuration conf)
+ throws IOException {
+ final List<NameNodeConnector> connectors = new
ArrayList<NameNodeConnector>(
+ namenodes.size());
+ for (URI uri : namenodes) {
+ NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, conf);
+ nnc.getKeyManager().startBlockKeyUpdater();
+ connectors.add(nnc);
+ }
+ return connectors;
+ }
private final URI nameNodeUri;
private final String blockpoolID;
@@ -59,7 +77,7 @@ public class NameNodeConnector implement
private final ClientProtocol client;
private final KeyManager keyManager;
- private final FileSystem fs;
+ private final DistributedFileSystem fs;
private final Path idPath;
private final OutputStream out;
@@ -74,7 +92,7 @@ public class NameNodeConnector implement
NamenodeProtocol.class).getProxy();
this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class).getProxy();
- this.fs = FileSystem.get(nameNodeUri, conf);
+ this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();
@@ -89,6 +107,10 @@ public class NameNodeConnector implement
}
}
+ public DistributedFileSystem getDistributedFileSystem() {
+ return fs;
+ }
+
/** @return the block pool ID */
public String getBlockpoolID() {
return blockpoolID;
Added:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java?rev=1618675&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
(added)
+++
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
Mon Aug 18 17:51:18 2014
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.text.DateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
+import
org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.PendingMove;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.StorageGroupMap;
+import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.balancer.Matcher;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
[email protected]
+public class Mover {
+ static final Log LOG = LogFactory.getLog(Mover.class);
+
+ private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
+
+ private static class StorageMap {
+ private final StorageGroupMap<Source> sources
+ = new StorageGroupMap<Source>();
+ private final StorageGroupMap<StorageGroup> targets
+ = new StorageGroupMap<StorageGroup>();
+ private final EnumMap<StorageType, List<StorageGroup>> targetStorageTypeMap
+ = new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
+
+ private StorageMap() {
+ for(StorageType t : StorageType.asList()) {
+ targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
+ }
+ }
+
+ private void add(Source source, StorageGroup target) {
+ sources.put(source);
+ targets.put(target);
+ getTargetStorages(target.getStorageType()).add(target);
+ }
+
+ private Source getSource(MLocation ml) {
+ return get(sources, ml);
+ }
+
+ private StorageGroup getTarget(MLocation ml) {
+ return get(targets, ml);
+ }
+
+ private static <G extends StorageGroup> G get(StorageGroupMap<G> map,
MLocation ml) {
+ return map.get(ml.datanode.getDatanodeUuid(), ml.storageType);
+ }
+
+ private List<StorageGroup> getTargetStorages(StorageType t) {
+ return targetStorageTypeMap.get(t);
+ }
+ }
+
+ private final Dispatcher dispatcher;
+ private final StorageMap storages;
+
+ private final BlockStoragePolicy.Suite blockStoragePolicies;
+
+ Mover(NameNodeConnector nnc, Configuration conf) {
+ final long movedWinWidth = conf.getLong(
+ DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
+ final int moverThreads = conf.getInt(
+ DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
+ DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+ final int maxConcurrentMovesPerNode = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+ this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
+ Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
+ maxConcurrentMovesPerNode, conf);
+ this.storages = new StorageMap();
+ this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
+ }
+
+ private ExitStatus run() {
+ try {
+ final List<DatanodeStorageReport> reports = dispatcher.init();
+ for(DatanodeStorageReport r : reports) {
+ final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
+ for(StorageType t : StorageType.asList()) {
+ final long maxRemaining = getMaxRemaining(r, t);
+ if (maxRemaining > 0L) {
+ final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
+ final StorageGroup target = dn.addTarget(t, maxRemaining);
+ storages.add(source, target);
+ }
+ }
+ }
+
+ new Processor().processNamespace();
+
+ return ExitStatus.IN_PROGRESS;
+ } catch (IllegalArgumentException e) {
+ System.out.println(e + ". Exiting ...");
+ return ExitStatus.ILLEGAL_ARGUMENTS;
+ } catch (IOException e) {
+ System.out.println(e + ". Exiting ...");
+ return ExitStatus.IO_EXCEPTION;
+ } finally {
+ dispatcher.shutdownNow();
+ }
+ }
+
+ private static long getMaxRemaining(DatanodeStorageReport report,
StorageType t) {
+ long max = 0L;
+ for(StorageReport r : report.getStorageReports()) {
+ if (r.getStorage().getStorageType() == t) {
+ if (r.getRemaining() > max) {
+ max = r.getRemaining();
+ }
+ }
+ }
+ return max;
+ }
+
+ private class Processor {
+ private final DFSClient dfs;
+
+ private Processor() {
+ dfs = dispatcher.getDistributedFileSystem().getClient();
+ }
+
+ private void processNamespace() {
+ try {
+ processDirRecursively("", dfs.getFileInfo("/"));
+ } catch (IOException e) {
+ LOG.warn("Failed to get root directory status. Ignore and continue.",
e);
+ }
+ }
+
+ private void processDirRecursively(String parent, HdfsFileStatus status) {
+ if (status.isSymlink()) {
+ return; //ignore symlinks
+ } else if (status.isDir()) {
+ String dir = status.getFullName(parent);
+ if (!dir.endsWith(Path.SEPARATOR)) {
+ dir = dir + Path.SEPARATOR;
+ }
+
+ for(byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
+ final DirectoryListing children;
+ try {
+ children = dfs.listPaths(dir, lastReturnedName, true);
+ } catch(IOException e) {
+ LOG.warn("Failed to list directory " + dir
+ + ". Ignore the directory and continue.", e);
+ return;
+ }
+ if (children == null) {
+ return;
+ }
+ for (HdfsFileStatus child : children.getPartialListing()) {
+ processDirRecursively(dir, child);
+ }
+ if (!children.hasMore()) {
+ lastReturnedName = children.getLastName();
+ } else {
+ return;
+ }
+ }
+ } else { // file
+ processFile(parent, (HdfsLocatedFileStatus)status);
+ }
+ }
+
+ private void processFile(String parent, HdfsLocatedFileStatus status) {
+ final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
+ status.getStoragePolicy());
+ final List<StorageType> types = policy.chooseStorageTypes(
+ status.getReplication());
+
+ final LocatedBlocks locations = status.getBlockLocations();
+ for(LocatedBlock lb : locations.getLocatedBlocks()) {
+ final StorageTypeDiff diff = new StorageTypeDiff(types,
lb.getStorageTypes());
+ if (!diff.removeOverlap()) {
+ scheduleMoves4Block(diff, lb);
+ }
+ }
+ }
+
+ void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+ final List<MLocation> locations = MLocation.toLocations(lb);
+ Collections.shuffle(locations);
+
+ final DBlock db = new DBlock(lb.getBlock().getLocalBlock());
+ for(MLocation ml : locations) {
+ db.addLocation(storages.getTarget(ml));
+ }
+
+ for(final Iterator<StorageType> i = diff.existing.iterator();
i.hasNext(); ) {
+ final StorageType t = i.next();
+ for(final Iterator<MLocation> j = locations.iterator(); j.hasNext(); )
{
+ final MLocation ml = j.next();
+ final Source source = storages.getSource(ml);
+ if (ml.storageType == t) {
+ // try to schedule replica move.
+ if (scheduleMoveReplica(db, ml, source, diff.expected)) {
+ i.remove();
+ j.remove();
+ }
+ }
+ }
+ }
+ }
+
+ boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
+ List<StorageType> targetTypes) {
+ if (dispatcher.getCluster().isNodeGroupAware()) {
+ if (chooseTarget(db, ml, source, targetTypes,
Matcher.SAME_NODE_GROUP)) {
+ return true;
+ }
+ }
+
+ // Then, match nodes on the same rack
+ if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) {
+ return true;
+ }
+ // At last, match all remaining nodes
+ if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) {
+ return true;
+ }
+ return false;
+ }
+
+ boolean chooseTarget(DBlock db, MLocation ml, Source source,
+ List<StorageType> targetTypes, Matcher matcher) {
+ final NetworkTopology cluster = dispatcher.getCluster();
+ for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext();
) {
+ final StorageType t = i.next();
+ for(StorageGroup target : storages.getTargetStorages(t)) {
+ if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())
+ && dispatcher.isGoodBlockCandidate(source, target, t, db)) {
+ final PendingMove pm = dispatcher.new PendingMove(db, source,
target);
+ if (pm.chooseProxySource()) {
+ i.remove();
+ target.incScheduledSize(ml.size);
+ dispatcher.executePendingMove(pm);
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+ }
+
+
+ static class MLocation {
+ final DatanodeInfo datanode;
+ final StorageType storageType;
+ final long size;
+
+ MLocation(DatanodeInfo datanode, StorageType storageType, long size) {
+ this.datanode = datanode;
+ this.storageType = storageType;
+ this.size = size;
+ }
+
+ static List<MLocation> toLocations(LocatedBlock lb) {
+ final DatanodeInfo[] datanodeInfos = lb.getLocations();
+ final StorageType[] storageTypes = lb.getStorageTypes();
+ final long size = lb.getBlockSize();
+ final List<MLocation> locations = new LinkedList<MLocation>();
+ for(int i = 0; i < datanodeInfos.length; i++) {
+ locations.add(new MLocation(datanodeInfos[i], storageTypes[i], size));
+ }
+ return locations;
+ }
+ }
+
+ private static class StorageTypeDiff {
+ final List<StorageType> expected;
+ final List<StorageType> existing;
+
+ StorageTypeDiff(List<StorageType> expected, StorageType[] existing) {
+ this.expected = new LinkedList<StorageType>(expected);
+ this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
+ }
+
+ /**
+ * Remove the overlap between the expected types and the existing types.
+ * @return if the existing types is empty after removed the overlap.
+ */
+ boolean removeOverlap() {
+ for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
+ final StorageType t = i.next();
+ if (expected.remove(t)) {
+ i.remove();
+ }
+ }
+ return existing.isEmpty();
+ }
+ }
+
+ static int run(Collection<URI> namenodes, Configuration conf)
+ throws IOException, InterruptedException {
+ final long sleeptime = 2000*conf.getLong(
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+ LOG.info("namenodes = " + namenodes);
+
+ List<NameNodeConnector> connectors = Collections.emptyList();
+ try {
+ connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
+ Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
+
+ while (true) {
+ Collections.shuffle(connectors);
+ for(NameNodeConnector nnc : connectors) {
+ final Mover m = new Mover(nnc, conf);
+ final ExitStatus r = m.run();
+
+ if (r != ExitStatus.IN_PROGRESS) {
+ //must be an error statue, return.
+ return r.getExitCode();
+ }
+ }
+
+ Thread.sleep(sleeptime);
+ }
+ } finally {
+ for(NameNodeConnector nnc : connectors) {
+ IOUtils.cleanup(LOG, nnc);
+ }
+ }
+ }
+
+ static class Cli extends Configured implements Tool {
+ private static final String USAGE = "Usage: java "
+ + Mover.class.getSimpleName();
+
+ @Override
+ public int run(String[] args) throws Exception {
+ final long startTime = Time.monotonicNow();
+ final Configuration conf = getConf();
+
+ try {
+ final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ return Mover.run(namenodes, conf);
+ } catch (IOException e) {
+ System.out.println(e + ". Exiting ...");
+ return ExitStatus.IO_EXCEPTION.getExitCode();
+ } catch (InterruptedException e) {
+ System.out.println(e + ". Exiting ...");
+ return ExitStatus.INTERRUPTED.getExitCode();
+ } finally {
+ System.out.format("%-24s ",
DateFormat.getDateTimeInstance().format(new Date()));
+ System.out.println("Mover took " +
StringUtils.formatTime(Time.monotonicNow()-startTime));
+ }
+ }
+
+ /**
+ * Run a Mover in command line.
+ *
+ * @param args Command line arguments
+ */
+ public static void main(String[] args) {
+ if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
+ System.exit(0);
+ }
+
+ try {
+ System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
+ } catch (Throwable e) {
+ LOG.error("Exiting " + Mover.class.getSimpleName()
+ + " due to an exception", e);
+ System.exit(-1);
+ }
+ }
+ }
+}
\ No newline at end of file