Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Aug 19 23:49:39 2014 @@ -234,18 +234,6 @@ public class DatanodeDescriptor extends updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); } - /** - * Add data-node to the block. Add block to the head of the list of blocks - * belonging to the data-node. - */ - public boolean addBlock(String storageID, BlockInfo b) { - DatanodeStorageInfo s = getStorageInfo(storageID); - if (s != null) { - return s.addBlock(b); - } - return false; - } - @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { @@ -259,6 +247,15 @@ public class DatanodeDescriptor extends } } + public StorageReport[] getStorageReports() { + final DatanodeStorageInfo[] infos = getStorageInfos(); + final StorageReport[] reports = new StorageReport[infos.length]; + for(int i = 0; i < infos.length; i++) { + reports[i] = infos[i].toStorageReport(); + } + return reports; + } + boolean hasStaleStorages() { synchronized (storageMap) { for (DatanodeStorageInfo storage : storageMap.values()) { @@ -275,13 +272,10 @@ public class DatanodeDescriptor extends * data-node from the block. */ boolean removeBlock(BlockInfo b) { - int index = b.findStorageInfo(this); + final DatanodeStorageInfo s = b.findStorageInfo(this); // if block exists on this datanode - if (index >= 0) { - DatanodeStorageInfo s = b.getStorageInfo(index); - if (s != null) { - return s.removeBlock(b); - } + if (s != null) { + return s.removeBlock(b); } return false; } @@ -298,24 +292,6 @@ public class DatanodeDescriptor extends return false; } - /** - * Replace specified old block with a new one in the DataNodeDescriptor. - * - * @param oldBlock - block to be replaced - * @param newBlock - a replacement block - * @return the new block - */ - public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) { - int index = oldBlock.findStorageInfo(this); - DatanodeStorageInfo s = oldBlock.getStorageInfo(index); - boolean done = s.removeBlock(oldBlock); - assert done : "Old block should belong to the data-node when replacing"; - - done = s.addBlock(newBlock); - assert done : "New block should not belong to the data-node when replacing"; - return newBlock; - } - public void resetBlocks() { setCapacity(0); setRemaining(0);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Aug 19 23:49:39 2014 @@ -135,7 +135,10 @@ public class DatanodeManager { /** The number of stale DataNodes */ private volatile int numStaleNodes; - + + /** The number of stale storages */ + private volatile int numStaleStorages; + /** * Whether or not this cluster has ever consisted of more than 1 rack, * according to the NetworkTopology. @@ -331,9 +334,22 @@ public class DatanodeManager { return heartbeatManager; } + private boolean isInactive(DatanodeInfo datanode) { + if (datanode.isDecommissioned()) { + return true; + } + + if (avoidStaleDataNodesForRead) { + return datanode.isStale(staleInterval); + } + + return false; + } + /** Sort the located blocks by the distance to the target host. */ public void sortLocatedBlocks(final String targethost, - final List<LocatedBlock> locatedblocks) { + final List<LocatedBlock> locatedblocks, + boolean randomizeBlockLocationsPerBlock) { //sort the blocks // As it is possible for the separation of node manager and datanode, // here we should get node but not datanode only . @@ -351,9 +367,17 @@ public class DatanodeManager { DFSUtil.DECOM_COMPARATOR; for (LocatedBlock b : locatedblocks) { - networktopology.pseudoSortByDistance(client, b.getLocations()); + DatanodeInfo[] di = b.getLocations(); // Move decommissioned/stale datanodes to the bottom - Arrays.sort(b.getLocations(), comparator); + Arrays.sort(di, comparator); + + int lastActiveIndex = di.length - 1; + while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) { + --lastActiveIndex; + } + int activeLen = lastActiveIndex + 1; + networktopology.sortByDistance(client, b.getLocations(), activeLen, b + .getBlock().getBlockId(), randomizeBlockLocationsPerBlock); } } @@ -373,6 +397,11 @@ public class DatanodeManager { return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort); } + /** @return the Host2NodesMap */ + public Host2NodesMap getHost2DatanodeMap() { + return this.host2DatanodeMap; + } + /** * Given datanode address or host name, returns the DatanodeDescriptor for the * same, or if it doesn't find the datanode, it looks for a machine local and @@ -435,9 +464,9 @@ public class DatanodeManager { } /** - * Get data node by storage ID. + * Get data node by datanode ID. * - * @param nodeID + * @param nodeID datanode ID * @return DatanodeDescriptor or null if the node is not found. * @throws UnregisteredNodeException */ @@ -678,6 +707,52 @@ public class DatanodeManager { } /** + * Resolve a node's dependencies in the network. If the DNS to switch + * mapping fails then this method returns empty list of dependencies + * @param node to get dependencies for + * @return List of dependent host names + */ + private List<String> getNetworkDependenciesWithDefault(DatanodeInfo node) { + List<String> dependencies; + try { + dependencies = getNetworkDependencies(node); + } catch (UnresolvedTopologyException e) { + LOG.error("Unresolved dependency mapping for host " + + node.getHostName() +". Continuing with an empty dependency list"); + dependencies = Collections.emptyList(); + } + return dependencies; + } + + /** + * Resolves a node's dependencies in the network. If the DNS to switch + * mapping fails to get dependencies, then this method throws + * UnresolvedTopologyException. + * @param node to get dependencies for + * @return List of dependent host names + * @throws UnresolvedTopologyException if the DNS to switch mapping fails + */ + private List<String> getNetworkDependencies(DatanodeInfo node) + throws UnresolvedTopologyException { + List<String> dependencies = Collections.emptyList(); + + if (dnsToSwitchMapping instanceof DNSToSwitchMappingWithDependency) { + //Get dependencies + dependencies = + ((DNSToSwitchMappingWithDependency)dnsToSwitchMapping).getDependency( + node.getHostName()); + if(dependencies == null) { + LOG.error("The dependency call returned null for host " + + node.getHostName()); + throw new UnresolvedTopologyException("The dependency call returned " + + "null for host " + node.getHostName()); + } + } + + return dependencies; + } + + /** * Remove an already decommissioned data node who is neither in include nor * exclude hosts lists from the the list of live or dead nodes. This is used * to not display an already decommssioned data node to the operators. @@ -749,7 +824,9 @@ public class DatanodeManager { } /** Start decommissioning the specified datanode. */ - private void startDecommission(DatanodeDescriptor node) { + @InterfaceAudience.Private + @VisibleForTesting + public void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) { LOG.info("Start Decommissioning " + node + " " + storage @@ -869,12 +946,14 @@ public class DatanodeManager { nodeS.setDisallowed(false); // Node is in the include list // resolve network location - if(this.rejectUnresolvedTopologyDN) - { - nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + if(this.rejectUnresolvedTopologyDN) { + nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); + nodeS.setDependentHostNames(getNetworkDependencies(nodeS)); } else { nodeS.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); + nodeS.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); @@ -900,9 +979,12 @@ public class DatanodeManager { // resolve network location if(this.rejectUnresolvedTopologyDN) { nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); + nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr)); } else { nodeDescr.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr)); + nodeDescr.setDependentHostNames( + getNetworkDependenciesWithDefault(nodeDescr)); } networktopology.add(nodeDescr); nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); @@ -1001,34 +1083,23 @@ public class DatanodeManager { /** @return the number of dead datanodes. */ public int getNumDeadDataNodes() { - int numDead = 0; - synchronized (datanodeMap) { - for(DatanodeDescriptor dn : datanodeMap.values()) { - if (isDatanodeDead(dn) ) { - numDead++; - } - } - } - return numDead; + return getDatanodeListForReport(DatanodeReportType.DEAD).size(); } /** @return list of datanodes where decommissioning is in progress. */ public List<DatanodeDescriptor> getDecommissioningNodes() { - namesystem.readLock(); - try { - final List<DatanodeDescriptor> decommissioningNodes - = new ArrayList<DatanodeDescriptor>(); - final List<DatanodeDescriptor> results = getDatanodeListForReport( - DatanodeReportType.LIVE); - for(DatanodeDescriptor node : results) { - if (node.isDecommissionInProgress()) { - decommissioningNodes.add(node); - } + // There is no need to take namesystem reader lock as + // getDatanodeListForReport will synchronize on datanodeMap + final List<DatanodeDescriptor> decommissioningNodes + = new ArrayList<DatanodeDescriptor>(); + final List<DatanodeDescriptor> results = getDatanodeListForReport( + DatanodeReportType.LIVE); + for(DatanodeDescriptor node : results) { + if (node.isDecommissionInProgress()) { + decommissioningNodes.add(node); } - return decommissioningNodes; - } finally { - namesystem.readUnlock(); } + return decommissioningNodes; } /* Getter and Setter for stale DataNodes related attributes */ @@ -1074,6 +1145,22 @@ public class DatanodeManager { return this.numStaleNodes; } + /** + * Get the number of content stale storages. + */ + public int getNumStaleStorages() { + return numStaleStorages; + } + + /** + * Set the number of content stale storages. + * + * @param numStaleStorages The number of content stale storages. + */ + void setNumStaleStorages(int numStaleStorages) { + this.numStaleStorages = numStaleStorages; + } + /** Fetch live and dead datanodes. */ public void fetchDatanodes(final List<DatanodeDescriptor> live, final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) { @@ -1081,23 +1168,20 @@ public class DatanodeManager { throw new HadoopIllegalArgumentException("Both live and dead lists are null"); } - namesystem.readLock(); - try { - final List<DatanodeDescriptor> results = - getDatanodeListForReport(DatanodeReportType.ALL); - for(DatanodeDescriptor node : results) { - if (isDatanodeDead(node)) { - if (dead != null) { - dead.add(node); - } - } else { - if (live != null) { - live.add(node); - } + // There is no need to take namesystem reader lock as + // getDatanodeListForReport will synchronize on datanodeMap + final List<DatanodeDescriptor> results = + getDatanodeListForReport(DatanodeReportType.ALL); + for(DatanodeDescriptor node : results) { + if (isDatanodeDead(node)) { + if (dead != null) { + dead.add(node); + } + } else { + if (live != null) { + live.add(node); } } - } finally { - namesystem.readUnlock(); } if (removeDecommissionNode) { @@ -1189,10 +1273,15 @@ public class DatanodeManager { /** For generating datanode reports */ public List<DatanodeDescriptor> getDatanodeListForReport( final DatanodeReportType type) { - boolean listLiveNodes = type == DatanodeReportType.ALL || - type == DatanodeReportType.LIVE; - boolean listDeadNodes = type == DatanodeReportType.ALL || - type == DatanodeReportType.DEAD; + final boolean listLiveNodes = + type == DatanodeReportType.ALL || + type == DatanodeReportType.LIVE; + final boolean listDeadNodes = + type == DatanodeReportType.ALL || + type == DatanodeReportType.DEAD; + final boolean listDecommissioningNodes = + type == DatanodeReportType.ALL || + type == DatanodeReportType.DECOMMISSIONING; ArrayList<DatanodeDescriptor> nodes; final HostFileManager.HostSet foundNodes = new HostFileManager.HostSet(); @@ -1203,7 +1292,10 @@ public class DatanodeManager { nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size()); for (DatanodeDescriptor dn : datanodeMap.values()) { final boolean isDead = isDatanodeDead(dn); - if ((listLiveNodes && !isDead) || (listDeadNodes && isDead)) { + final boolean isDecommissioning = dn.isDecommissionInProgress(); + if ((listLiveNodes && !isDead) || + (listDeadNodes && isDead) || + (listDecommissioningNodes && isDecommissioning)) { nodes.add(dn); } foundNodes.add(HostFileManager.resolvedAddressFromDatanodeID(dn)); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Tue Aug 19 23:49:39 2014 @@ -52,6 +52,12 @@ public interface DatanodeStatistics { /** @return the xceiver count */ public int getXceiverCount(); + /** @return average xceiver count for non-decommission(ing|ed) nodes */ + public int getInServiceXceiverCount(); + + /** @return number of non-decommission(ing|ed) nodes */ + public int getNumDatanodesInService(); + /** * @return the total used space by data nodes for non-DFS purposes * such as storing temporary files on the local file system Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -206,13 +207,29 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - boolean addBlock(BlockInfo b) { - if(!b.addStorage(this)) - return false; + public boolean addBlock(BlockInfo b) { + // First check whether the block belongs to a different storage + // on the same DN. + boolean replaced = false; + DatanodeStorageInfo otherStorage = + b.findStorageInfo(getDatanodeDescriptor()); + + if (otherStorage != null) { + if (otherStorage != this) { + // The block belongs to a different storage. Remove it first. + otherStorage.removeBlock(b); + replaced = true; + } else { + // The block is already associated with this storage. + return false; + } + } + // add to the head of the data-node list + b.addStorage(this); blockList = b.listInsert(blockList, this); numBlocks++; - return true; + return !replaced; } boolean removeBlock(BlockInfo b) { @@ -290,4 +307,27 @@ public class DatanodeStorageInfo { public String toString() { return "[" + storageType + "]" + storageID + ":" + state; } + + StorageReport toStorageReport() { + return new StorageReport( + new DatanodeStorage(storageID, state, storageType), + false, capacity, dfsUsed, remaining, blockPoolUsed); + } + + /** @return the first {@link DatanodeStorageInfo} corresponding to + * the given datanode + */ + static DatanodeStorageInfo getDatanodeStorageInfo( + final Iterable<DatanodeStorageInfo> infos, + final DatanodeDescriptor datanode) { + if (datanode == null) { + return null; + } + for(DatanodeStorageInfo storage : infos) { + if (storage.getDatanodeDescriptor() == datanode) { + return storage; + } + } + return null; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Tue Aug 19 23:49:39 2014 @@ -151,6 +151,16 @@ class HeartbeatManager implements Datano } @Override + public synchronized int getInServiceXceiverCount() { + return stats.nodesInServiceXceiverCount; + } + + @Override + public synchronized int getNumDatanodesInService() { + return stats.nodesInService; + } + + @Override public synchronized long getCacheCapacity() { return stats.cacheCapacity; } @@ -178,7 +188,7 @@ class HeartbeatManager implements Datano } synchronized void register(final DatanodeDescriptor d) { - if (!datanodes.contains(d)) { + if (!d.isAlive) { addDatanode(d); //update its timestamp @@ -191,6 +201,8 @@ class HeartbeatManager implements Datano } synchronized void addDatanode(final DatanodeDescriptor d) { + // update in-service node count + stats.add(d); datanodes.add(d); d.isAlive = true; } @@ -244,6 +256,7 @@ class HeartbeatManager implements Datano DatanodeID dead = null; // check the number of stale nodes int numOfStaleNodes = 0; + int numOfStaleStorages = 0; synchronized(this) { for (DatanodeDescriptor d : datanodes) { if (dead == null && dm.isDatanodeDead(d)) { @@ -253,10 +266,17 @@ class HeartbeatManager implements Datano if (d.isStale(dm.getStaleInterval())) { numOfStaleNodes++; } + DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); + for(DatanodeStorageInfo storageInfo : storageInfos) { + if (storageInfo.areBlockContentsStale()) { + numOfStaleStorages++; + } + } } // Set the number of stale nodes in the DatanodeManager dm.setNumStaleNodes(numOfStaleNodes); + dm.setNumStaleStorages(numOfStaleStorages); } allAlive = dead == null; @@ -323,6 +343,9 @@ class HeartbeatManager implements Datano private long cacheCapacity = 0L; private long cacheUsed = 0L; + private int nodesInService = 0; + private int nodesInServiceXceiverCount = 0; + private int expiredHeartbeats = 0; private void add(final DatanodeDescriptor node) { @@ -330,6 +353,8 @@ class HeartbeatManager implements Datano blockPoolUsed += node.getBlockPoolUsed(); xceiverCount += node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService++; + nodesInServiceXceiverCount += node.getXceiverCount(); capacityTotal += node.getCapacity(); capacityRemaining += node.getRemaining(); } else { @@ -344,6 +369,8 @@ class HeartbeatManager implements Datano blockPoolUsed -= node.getBlockPoolUsed(); xceiverCount -= node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService--; + nodesInServiceXceiverCount -= node.getXceiverCount(); capacityTotal -= node.getCapacity(); capacityRemaining -= node.getRemaining(); } else { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Tue Aug 19 23:49:39 2014 @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.DFSUtil; @InterfaceAudience.Private @InterfaceStability.Evolving class Host2NodesMap { + private HashMap<String, String> mapHost = new HashMap<String, String>(); private final HashMap<String, DatanodeDescriptor[]> map = new HashMap<String, DatanodeDescriptor[]>(); private final ReadWriteLock hostmapLock = new ReentrantReadWriteLock(); @@ -69,6 +70,10 @@ class Host2NodesMap { } String ipAddr = node.getIpAddr(); + String hostname = node.getHostName(); + + mapHost.put(hostname, ipAddr); + DatanodeDescriptor[] nodes = map.get(ipAddr); DatanodeDescriptor[] newNodes; if (nodes==null) { @@ -95,6 +100,7 @@ class Host2NodesMap { } String ipAddr = node.getIpAddr(); + String hostname = node.getHostName(); hostmapLock.writeLock().lock(); try { @@ -105,6 +111,8 @@ class Host2NodesMap { if (nodes.length==1) { if (nodes[0]==node) { map.remove(ipAddr); + //remove hostname key since last datanode is removed + mapHost.remove(hostname); return true; } else { return false; @@ -188,12 +196,40 @@ class Host2NodesMap { } } + + + /** get a data node by its hostname. This should be used if only one + * datanode service is running on a hostname. If multiple datanodes + * are running on a hostname then use methods getDataNodeByXferAddr and + * getDataNodeByHostNameAndPort. + * @return DatanodeDescriptor if found; otherwise null. + */ + DatanodeDescriptor getDataNodeByHostName(String hostname) { + if(hostname == null) { + return null; + } + + hostmapLock.readLock().lock(); + try { + String ipAddr = mapHost.get(hostname); + if(ipAddr == null) { + return null; + } else { + return getDatanodeByHost(ipAddr); + } + } finally { + hostmapLock.readLock().unlock(); + } + } + @Override public String toString() { final StringBuilder b = new StringBuilder(getClass().getSimpleName()) .append("["); - for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) { - b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue())); + for(Map.Entry<String, String> host: mapHost.entrySet()) { + DatanodeDescriptor[] e = map.get(host.getValue()); + b.append("\n " + host.getKey() + " => "+host.getValue() + " => " + + Arrays.asList(e)); } return b.append("\n]").toString(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Tue Aug 19 23:49:39 2014 @@ -18,16 +18,25 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.PrintWriter; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; +import java.util.GregorianCalendar; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.hdfs.DFSUtil; + +import com.google.common.annotations.VisibleForTesting; /** * Keeps a Collection for every named machine containing blocks @@ -36,16 +45,36 @@ import org.apache.hadoop.hdfs.util.Light */ @InterfaceAudience.Private class InvalidateBlocks { - /** Mapping: StorageID -> Collection of Blocks */ - private final Map<String, LightWeightHashSet<Block>> node2blocks = - new TreeMap<String, LightWeightHashSet<Block>>(); + /** Mapping: DatanodeInfo -> Collection of Blocks */ + private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks = + new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>(); /** The total number of blocks in the map. */ private long numBlocks = 0L; - private final DatanodeManager datanodeManager; + private final int blockInvalidateLimit; - InvalidateBlocks(final DatanodeManager datanodeManager) { - this.datanodeManager = datanodeManager; + /** + * The period of pending time for block invalidation since the NameNode + * startup + */ + private final long pendingPeriodInMs; + /** the startup time */ + private final long startupTime = Time.monotonicNow(); + + InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) { + this.blockInvalidateLimit = blockInvalidateLimit; + this.pendingPeriodInMs = pendingPeriodInMs; + printBlockDeletionTime(BlockManager.LOG); + } + + private void printBlockDeletionTime(final Log log) { + log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY + + " is set to " + DFSUtil.durationToString(pendingPeriodInMs)); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss"); + Calendar calendar = new GregorianCalendar(); + calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000)); + log.info("The block deletion will start around " + + sdf.format(calendar.getTime())); } /** @return the number of blocks to be invalidated . */ @@ -58,12 +87,9 @@ class InvalidateBlocks { * invalidation. Blocks are compared including their generation stamps: * if a block is pending invalidation but with a different generation stamp, * returns false. - * @param storageID the storage to check - * @param the block to look for - * */ - synchronized boolean contains(final String storageID, final Block block) { - final LightWeightHashSet<Block> s = node2blocks.get(storageID); + synchronized boolean contains(final DatanodeInfo dn, final Block block) { + final LightWeightHashSet<Block> s = node2blocks.get(dn); if (s == null) { return false; // no invalidate blocks for this storage ID } @@ -78,10 +104,10 @@ class InvalidateBlocks { */ synchronized void add(final Block block, final DatanodeInfo datanode, final boolean log) { - LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid()); + LightWeightHashSet<Block> set = node2blocks.get(datanode); if (set == null) { set = new LightWeightHashSet<Block>(); - node2blocks.put(datanode.getDatanodeUuid(), set); + node2blocks.put(datanode, set); } if (set.add(block)) { numBlocks++; @@ -93,20 +119,20 @@ class InvalidateBlocks { } /** Remove a storage from the invalidatesSet */ - synchronized void remove(final String storageID) { - final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID); + synchronized void remove(final DatanodeInfo dn) { + final LightWeightHashSet<Block> blocks = node2blocks.remove(dn); if (blocks != null) { numBlocks -= blocks.size(); } } /** Remove the block from the specified storage. */ - synchronized void remove(final String storageID, final Block block) { - final LightWeightHashSet<Block> v = node2blocks.get(storageID); + synchronized void remove(final DatanodeInfo dn, final Block block) { + final LightWeightHashSet<Block> v = node2blocks.get(dn); if (v != null && v.remove(block)) { numBlocks--; if (v.isEmpty()) { - node2blocks.remove(storageID); + node2blocks.remove(dn); } } } @@ -120,34 +146,50 @@ class InvalidateBlocks { return; } - for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) { + for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) { final LightWeightHashSet<Block> blocks = entry.getValue(); if (blocks.size() > 0) { - out.println(datanodeManager.getDatanode(entry.getKey())); + out.println(entry.getKey()); out.println(blocks); } } } /** @return a list of the storage IDs. */ - synchronized List<String> getStorageIDs() { - return new ArrayList<String>(node2blocks.keySet()); + synchronized List<DatanodeInfo> getDatanodes() { + return new ArrayList<DatanodeInfo>(node2blocks.keySet()); + } + + /** + * @return the remianing pending time + */ + @VisibleForTesting + long getInvalidationDelay() { + return pendingPeriodInMs - (Time.monotonicNow() - startupTime); } - synchronized List<Block> invalidateWork( - final String storageId, final DatanodeDescriptor dn) { - final LightWeightHashSet<Block> set = node2blocks.get(storageId); + synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) { + final long delay = getInvalidationDelay(); + if (delay > 0) { + if (BlockManager.LOG.isDebugEnabled()) { + BlockManager.LOG + .debug("Block deletion is delayed during NameNode startup. " + + "The deletion will start after " + delay + " ms."); + } + return null; + } + final LightWeightHashSet<Block> set = node2blocks.get(dn); if (set == null) { return null; } // # blocks that can be sent in one message is limited - final int limit = datanodeManager.blockInvalidateLimit; + final int limit = blockInvalidateLimit; final List<Block> toInvalidate = set.pollN(limit); // If we send everything in this message, remove this node entry if (set.isEmpty()) { - remove(storageId); + remove(dn); } dn.addBlocksToBeInvalidated(toInvalidate); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Tue Aug 19 23:49:39 2014 @@ -23,6 +23,7 @@ import java.util.Queue; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,14 +42,12 @@ class PendingDataNodeMessages { static class ReportedBlockInfo { private final Block block; - private final DatanodeDescriptor dn; - private final String storageID; + private final DatanodeStorageInfo storageInfo; private final ReplicaState reportedState; - ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block, + ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { - this.dn = dn; - this.storageID = storageID; + this.storageInfo = storageInfo; this.block = block; this.reportedState = reportedState; } @@ -57,30 +56,48 @@ class PendingDataNodeMessages { return block; } - DatanodeDescriptor getNode() { - return dn; - } - - String getStorageID() { - return storageID; - } - ReplicaState getReportedState() { return reportedState; } + + DatanodeStorageInfo getStorageInfo() { + return storageInfo; + } @Override public String toString() { - return "ReportedBlockInfo [block=" + block + ", dn=" + dn + return "ReportedBlockInfo [block=" + block + ", dn=" + + storageInfo.getDatanodeDescriptor() + ", reportedState=" + reportedState + "]"; } } - void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + /** + * Remove all pending DN messages which reference the given DN. + * @param dn the datanode whose messages we should remove. + */ + void removeAllMessagesForDatanode(DatanodeDescriptor dn) { + for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry : + queueByBlockId.entrySet()) { + Queue<ReportedBlockInfo> newQueue = Lists.newLinkedList(); + Queue<ReportedBlockInfo> oldQueue = entry.getValue(); + while (!oldQueue.isEmpty()) { + ReportedBlockInfo rbi = oldQueue.remove(); + if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) { + newQueue.add(rbi); + } else { + count--; + } + } + queueByBlockId.put(entry.getKey(), newQueue); + } + } + + void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { block = new Block(block); getBlockQueue(block).add( - new ReportedBlockInfo(dn, storageID, block, reportedState)); + new ReportedBlockInfo(storageInfo, block, reportedState)); count++; } @@ -106,7 +123,7 @@ class PendingDataNodeMessages { return queue; } - public int count() { + int count() { return count ; } @@ -123,7 +140,7 @@ class PendingDataNodeMessages { return sb.toString(); } - public Iterable<ReportedBlockInfo> takeAll() { + Iterable<ReportedBlockInfo> takeAll() { List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity( count); for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Tue Aug 19 23:49:39 2014 @@ -92,7 +92,9 @@ public final class HdfsServerConstants { RECOVER ("-recover"), FORCE("-force"), NONINTERACTIVE("-nonInteractive"), - RENAMERESERVED("-renameReserved"); + RENAMERESERVED("-renameReserved"), + METADATAVERSION("-metadataVersion"), + UPGRADEONLY("-upgradeOnly"); private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile( "(\\w+)\\((\\w+)\\)"); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Tue Aug 19 23:49:39 2014 @@ -18,544 +18,45 @@ package org.apache.hadoop.hdfs.server.common; -import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URL; -import java.net.URLEncoder; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; - -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.jsp.JspWriter; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.BlockReaderFactory; -import org.apache.hadoop.hdfs.ClientContext; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.RemotePeerFactory; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DoAsParam; import org.apache.hadoop.hdfs.web.resources.UserParam; -import org.apache.hadoop.http.HtmlQuoting; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.hadoop.security.authorize.ProxyServers; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.VersionInfo; -import com.google.common.base.Charsets; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; @InterfaceAudience.Private public class JspHelper { public static final String CURRENT_CONF = "current.conf"; public static final String DELEGATION_PARAMETER_NAME = DelegationParam.NAME; public static final String NAMENODE_ADDRESS = "nnaddr"; - static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME + - "="; private static final Log LOG = LogFactory.getLog(JspHelper.class); /** Private constructor for preventing creating JspHelper object. */ - private JspHelper() {} - - // data structure to count number of blocks on datanodes. - private static class NodeRecord extends DatanodeInfo { - int frequency; - - public NodeRecord(DatanodeInfo info, int count) { - super(info); - this.frequency = count; - } - - @Override - public boolean equals(Object obj) { - // Sufficient to use super equality as datanodes are uniquely identified - // by DatanodeID - return (this == obj) || super.equals(obj); - } - @Override - public int hashCode() { - // Super implementation is sufficient - return super.hashCode(); - } - } - - // compare two records based on their frequency - private static class NodeRecordComparator implements Comparator<NodeRecord> { - - @Override - public int compare(NodeRecord o1, NodeRecord o2) { - if (o1.frequency < o2.frequency) { - return -1; - } else if (o1.frequency > o2.frequency) { - return 1; - } - return 0; - } - } - - /** - * convenience method for canonicalizing host name. - * @param addr name:port or name - * @return canonicalized host name - */ - public static String canonicalize(String addr) { - // default port 1 is supplied to allow addr without port. - // the port will be ignored. - return NetUtils.createSocketAddr(addr, 1).getAddress() - .getCanonicalHostName(); - } - - /** - * A helper class that generates the correct URL for different schema. - * - */ - public static final class Url { - public static String authority(String scheme, DatanodeID d) { - String fqdn = (d.getIpAddr() != null && !d.getIpAddr().isEmpty())? - canonicalize(d.getIpAddr()): - d.getHostName(); - if (scheme.equals("http")) { - return fqdn + ":" + d.getInfoPort(); - } else if (scheme.equals("https")) { - return fqdn + ":" + d.getInfoSecurePort(); - } else { - throw new IllegalArgumentException("Unknown scheme:" + scheme); - } - } - - public static String url(String scheme, DatanodeID d) { - return scheme + "://" + authority(scheme, d); - } - } - - public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf) - throws IOException { - HashMap<DatanodeInfo, NodeRecord> map = - new HashMap<DatanodeInfo, NodeRecord>(); - for (LocatedBlock block : blks.getLocatedBlocks()) { - DatanodeInfo[] nodes = block.getLocations(); - for (DatanodeInfo node : nodes) { - NodeRecord record = map.get(node); - if (record == null) { - map.put(node, new NodeRecord(node, 1)); - } else { - record.frequency++; - } - } - } - NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]); - Arrays.sort(nodes, new NodeRecordComparator()); - return bestNode(nodes, false); - } - - public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf) - throws IOException { - DatanodeInfo[] nodes = blk.getLocations(); - return bestNode(nodes, true); - } - - private static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom) - throws IOException { - if (nodes == null || nodes.length == 0) { - throw new IOException("No nodes contain this block"); - } - int l = 0; - while (l < nodes.length && !nodes[l].isDecommissioned()) { - ++l; - } - - if (l == 0) { - throw new IOException("No active nodes contain this block"); - } - - int index = doRandom ? DFSUtil.getRandom().nextInt(l) : 0; - return nodes[index]; - } - - public static void streamBlockInAscii(InetSocketAddress addr, String poolId, - long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp, - long blockSize, long offsetIntoBlock, long chunkSizeToView, - JspWriter out, final Configuration conf, DFSClient.Conf dfsConf, - final DataEncryptionKey encryptionKey) - throws IOException { - if (chunkSizeToView == 0) return; - int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock); - - BlockReader blockReader = new BlockReaderFactory(dfsConf). - setInetSocketAddress(addr). - setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)). - setFileName(BlockReaderFactory.getFileName(addr, poolId, blockId)). - setBlockToken(blockToken). - setStartOffset(offsetIntoBlock). - setLength(amtToRead). - setVerifyChecksum(true). - setClientName("JspHelper"). - setClientCacheContext(ClientContext.getFromConf(conf)). - setDatanodeInfo(new DatanodeInfo( - new DatanodeID(addr.getAddress().getHostAddress(), - addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))). - setCachingStrategy(CachingStrategy.newDefaultStrategy()). - setConfiguration(conf). - setRemotePeerFactory(new RemotePeerFactory() { - @Override - public Peer newConnectedPeer(InetSocketAddress addr) - throws IOException { - Peer peer = null; - Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); - try { - sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); - sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey); - } finally { - if (peer == null) { - IOUtils.closeSocket(sock); - } - } - return peer; - } - }). - build(); - - final byte[] buf = new byte[amtToRead]; - try { - int readOffset = 0; - int retries = 2; - while (amtToRead > 0) { - int numRead = amtToRead; - try { - blockReader.readFully(buf, readOffset, amtToRead); - } catch (IOException e) { - retries--; - if (retries == 0) - throw new IOException("Could not read data from datanode"); - continue; - } - amtToRead -= numRead; - readOffset += numRead; - } - } finally { - blockReader.close(); - } - out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); - } - - public static void addTableHeader(JspWriter out) throws IOException { - out.print("<table border=\"1\""+ - " cellpadding=\"2\" cellspacing=\"2\">"); - out.print("<tbody>"); - } - public static void addTableRow(JspWriter out, String[] columns) throws IOException { - out.print("<tr>"); - for (int i = 0; i < columns.length; i++) { - out.print("<td style=\"vertical-align: top;\"><B>"+columns[i]+"</B><br></td>"); - } - out.print("</tr>"); - } - public static void addTableRow(JspWriter out, String[] columns, int row) throws IOException { - out.print("<tr>"); - - for (int i = 0; i < columns.length; i++) { - if (row/2*2 == row) {//even - out.print("<td style=\"vertical-align: top;background-color:LightGrey;\"><B>"+columns[i]+"</B><br></td>"); - } else { - out.print("<td style=\"vertical-align: top;background-color:LightBlue;\"><B>"+columns[i]+"</B><br></td>"); - - } - } - out.print("</tr>"); - } - public static void addTableFooter(JspWriter out) throws IOException { - out.print("</tbody></table>"); - } - - public static void sortNodeList(final List<DatanodeDescriptor> nodes, - String field, String order) { - - class NodeComapare implements Comparator<DatanodeDescriptor> { - static final int - FIELD_NAME = 1, - FIELD_LAST_CONTACT = 2, - FIELD_BLOCKS = 3, - FIELD_CAPACITY = 4, - FIELD_USED = 5, - FIELD_PERCENT_USED = 6, - FIELD_NONDFS_USED = 7, - FIELD_REMAINING = 8, - FIELD_PERCENT_REMAINING = 9, - FIELD_ADMIN_STATE = 10, - FIELD_DECOMMISSIONED = 11, - FIELD_BLOCKPOOL_USED = 12, - FIELD_PERBLOCKPOOL_USED = 13, - FIELD_FAILED_VOLUMES = 14, - SORT_ORDER_ASC = 1, - SORT_ORDER_DSC = 2; - - int sortField = FIELD_NAME; - int sortOrder = SORT_ORDER_ASC; - - public NodeComapare(String field, String order) { - if (field.equals("lastcontact")) { - sortField = FIELD_LAST_CONTACT; - } else if (field.equals("capacity")) { - sortField = FIELD_CAPACITY; - } else if (field.equals("used")) { - sortField = FIELD_USED; - } else if (field.equals("nondfsused")) { - sortField = FIELD_NONDFS_USED; - } else if (field.equals("remaining")) { - sortField = FIELD_REMAINING; - } else if (field.equals("pcused")) { - sortField = FIELD_PERCENT_USED; - } else if (field.equals("pcremaining")) { - sortField = FIELD_PERCENT_REMAINING; - } else if (field.equals("blocks")) { - sortField = FIELD_BLOCKS; - } else if (field.equals("adminstate")) { - sortField = FIELD_ADMIN_STATE; - } else if (field.equals("decommissioned")) { - sortField = FIELD_DECOMMISSIONED; - } else if (field.equals("bpused")) { - sortField = FIELD_BLOCKPOOL_USED; - } else if (field.equals("pcbpused")) { - sortField = FIELD_PERBLOCKPOOL_USED; - } else if (field.equals("volfails")) { - sortField = FIELD_FAILED_VOLUMES; - } else { - sortField = FIELD_NAME; - } - - if (order.equals("DSC")) { - sortOrder = SORT_ORDER_DSC; - } else { - sortOrder = SORT_ORDER_ASC; - } - } - - @Override - public int compare(DatanodeDescriptor d1, - DatanodeDescriptor d2) { - int ret = 0; - switch (sortField) { - case FIELD_LAST_CONTACT: - ret = (int) (d2.getLastUpdate() - d1.getLastUpdate()); - break; - case FIELD_CAPACITY: - long dlong = d1.getCapacity() - d2.getCapacity(); - ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0); - break; - case FIELD_USED: - dlong = d1.getDfsUsed() - d2.getDfsUsed(); - ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0); - break; - case FIELD_NONDFS_USED: - dlong = d1.getNonDfsUsed() - d2.getNonDfsUsed(); - ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0); - break; - case FIELD_REMAINING: - dlong = d1.getRemaining() - d2.getRemaining(); - ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0); - break; - case FIELD_PERCENT_USED: - double ddbl =((d1.getDfsUsedPercent())- - (d2.getDfsUsedPercent())); - ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0); - break; - case FIELD_PERCENT_REMAINING: - ddbl =((d1.getRemainingPercent())- - (d2.getRemainingPercent())); - ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0); - break; - case FIELD_BLOCKS: - ret = d1.numBlocks() - d2.numBlocks(); - break; - case FIELD_ADMIN_STATE: - ret = d1.getAdminState().toString().compareTo( - d2.getAdminState().toString()); - break; - case FIELD_DECOMMISSIONED: - ret = DFSUtil.DECOM_COMPARATOR.compare(d1, d2); - break; - case FIELD_NAME: - ret = d1.getHostName().compareTo(d2.getHostName()); - break; - case FIELD_BLOCKPOOL_USED: - dlong = d1.getBlockPoolUsed() - d2.getBlockPoolUsed(); - ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0); - break; - case FIELD_PERBLOCKPOOL_USED: - ddbl = d1.getBlockPoolUsedPercent() - d2.getBlockPoolUsedPercent(); - ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0); - break; - case FIELD_FAILED_VOLUMES: - int dint = d1.getVolumeFailures() - d2.getVolumeFailures(); - ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0); - break; - default: - throw new IllegalArgumentException("Invalid sortField"); - } - return (sortOrder == SORT_ORDER_DSC) ? -ret : ret; - } - } - - Collections.sort(nodes, new NodeComapare(field, order)); - } - - public static void printPathWithLinks(String dir, JspWriter out, - int namenodeInfoPort, - String tokenString, - String nnAddress - ) throws IOException { - try { - String[] parts = dir.split(Path.SEPARATOR); - StringBuilder tempPath = new StringBuilder(dir.length()); - out.print("<a href=\"browseDirectory.jsp" + "?dir="+ Path.SEPARATOR - + "&namenodeInfoPort=" + namenodeInfoPort - + getDelegationTokenUrlParam(tokenString) - + getUrlParam(NAMENODE_ADDRESS, nnAddress) + "\">" + Path.SEPARATOR - + "</a>"); - tempPath.append(Path.SEPARATOR); - for (int i = 0; i < parts.length-1; i++) { - if (!parts[i].equals("")) { - tempPath.append(parts[i]); - out.print("<a href=\"browseDirectory.jsp" + "?dir=" - + HtmlQuoting.quoteHtmlChars(tempPath.toString()) + "&namenodeInfoPort=" + namenodeInfoPort - + getDelegationTokenUrlParam(tokenString) - + getUrlParam(NAMENODE_ADDRESS, nnAddress)); - out.print("\">" + HtmlQuoting.quoteHtmlChars(parts[i]) + "</a>" + Path.SEPARATOR); - tempPath.append(Path.SEPARATOR); - } - } - if(parts.length > 0) { - out.print(HtmlQuoting.quoteHtmlChars(parts[parts.length-1])); - } - } - catch (UnsupportedEncodingException ex) { - ex.printStackTrace(); - } - } - - public static void printGotoForm(JspWriter out, - int namenodeInfoPort, - String tokenString, - String file, - String nnAddress) throws IOException { - out.print("<form action=\"browseDirectory.jsp\" method=\"get\" name=\"goto\">"); - out.print("Goto : "); - out.print("<input name=\"dir\" type=\"text\" width=\"50\" id=\"dir\" value=\""+ HtmlQuoting.quoteHtmlChars(file)+"\"/>"); - out.print("<input name=\"go\" type=\"submit\" value=\"go\"/>"); - out.print("<input name=\"namenodeInfoPort\" type=\"hidden\" " - + "value=\"" + namenodeInfoPort + "\"/>"); - if (UserGroupInformation.isSecurityEnabled()) { - out.print("<input name=\"" + DELEGATION_PARAMETER_NAME - + "\" type=\"hidden\" value=\"" + tokenString + "\"/>"); - } - out.print("<input name=\""+ NAMENODE_ADDRESS +"\" type=\"hidden\" " - + "value=\"" + nnAddress + "\"/>"); - out.print("</form>"); - } - - public static void createTitle(JspWriter out, - HttpServletRequest req, - String file) throws IOException{ - if(file == null) file = ""; - int start = Math.max(0,file.length() - 100); - if(start != 0) - file = "..." + file.substring(start, file.length()); - out.print("<title>HDFS:" + file + "</title>"); - } - - /** Convert a String to chunk-size-to-view. */ - public static int string2ChunkSizeToView(String s, int defaultValue) { - int n = s == null? 0: Integer.parseInt(s); - return n > 0? n: defaultValue; - } - - /** Return a table containing version information. */ - public static String getVersionTable() { - return "<div class='dfstable'><table>" - + "\n <tr><td class='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision() + "</td></tr>" - + "\n <tr><td class='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch() + "</td></tr>" - + "\n</table></div>"; - } - - /** - * Validate filename. - * @return null if the filename is invalid. - * Otherwise, return the validated filename. - */ - public static String validatePath(String p) { - return p == null || p.length() == 0? - null: new Path(p).toUri().getPath(); - } - - /** - * Validate a long value. - * @return null if the value is invalid. - * Otherwise, return the validated Long object. - */ - public static Long validateLong(String value) { - return value == null? null: Long.parseLong(value); - } - - /** - * Validate a URL. - * @return null if the value is invalid. - * Otherwise, return the validated URL String. - */ - public static String validateURL(String value) { - try { - return URLEncoder.encode(new URL(value).toString(), "UTF-8"); - } catch (IOException e) { - return null; - } - } - - /** - * If security is turned off, what is the default web user? - * @param conf the configuration to look in - * @return the remote user that was configuration - */ - public static UserGroupInformation getDefaultWebUser(Configuration conf - ) throws IOException { - return UserGroupInformation.createRemoteUser(getDefaultWebUserName(conf)); - } + private JspHelper() {} private static String getDefaultWebUserName(Configuration conf ) throws IOException { @@ -690,10 +191,10 @@ public class JspHelper { // honor the X-Forwarded-For header set by a configured set of trusted // proxy servers. allows audit logging and proxy user checks to work // via an http proxy - static String getRemoteAddr(HttpServletRequest request) { + public static String getRemoteAddr(HttpServletRequest request) { String remoteAddr = request.getRemoteAddr(); String proxyHeader = request.getHeader("X-Forwarded-For"); - if (proxyHeader != null && ProxyUsers.isProxyServer(remoteAddr)) { + if (proxyHeader != null && ProxyServers.isProxyServer(remoteAddr)) { final String clientAddr = proxyHeader.split(",")[0].trim(); if (!clientAddr.isEmpty()) { remoteAddr = clientAddr; @@ -736,56 +237,4 @@ public class JspHelper { return username; } - /** - * Returns the url parameter for the given token string. - * @param tokenString - * @return url parameter - */ - public static String getDelegationTokenUrlParam(String tokenString) { - if (tokenString == null ) { - return ""; - } - if (UserGroupInformation.isSecurityEnabled()) { - return SET_DELEGATION + tokenString; - } else { - return ""; - } - } - - /** - * Returns the url parameter for the given string, prefixed with - * paramSeparator. - * - * @param name parameter name - * @param val parameter value - * @param paramSeparator URL parameter prefix, i.e. either '?' or '&' - * @return url parameter - */ - public static String getUrlParam(String name, String val, String paramSeparator) { - return val == null ? "" : paramSeparator + name + "=" + val; - } - - /** - * Returns the url parameter for the given string, prefixed with '?' if - * firstParam is true, prefixed with '&' if firstParam is false. - * - * @param name parameter name - * @param val parameter value - * @param firstParam true if this is the first parameter in the list, false otherwise - * @return url parameter - */ - public static String getUrlParam(String name, String val, boolean firstParam) { - return getUrlParam(name, val, firstParam ? "?" : "&"); - } - - /** - * Returns the url parameter for the given string, prefixed with '&'. - * - * @param name parameter name - * @param val parameter value - * @return url parameter - */ - public static String getUrlParam(String name, String val) { - return getUrlParam(name, val, false); - } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Tue Aug 19 23:49:39 2014 @@ -831,10 +831,10 @@ public abstract class Storage extends St } /** - * Checks if the upgrade from the given old version is supported. If - * no upgrade is supported, it throws IncorrectVersionException. - * - * @param oldVersion + * Checks if the upgrade from {@code oldVersion} is supported. + * @param oldVersion the version of the metadata to check with the current + * version + * @throws IOException if upgrade is not supported */ public static void checkVersionUpgradable(int oldVersion) throws IOException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Tue Aug 19 23:49:39 2014 @@ -148,8 +148,8 @@ public class StorageInfo { * Get common storage fields. * Should be overloaded if additional fields need to be get. * - * @param props - * @throws IOException + * @param props properties + * @throws IOException on error */ protected void setFieldsFromProperties( Properties props, StorageDirectory sd) throws IOException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Aug 19 23:49:39 2014 @@ -21,6 +21,7 @@ import com.google.common.annotations.Vis import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -38,6 +39,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * One instance per block-pool/namespace on the DN, which handles the @@ -91,6 +94,28 @@ class BPOfferService { */ private long lastActiveClaimTxId = -1; + private final ReentrantReadWriteLock mReadWriteLock = + new ReentrantReadWriteLock(); + private final Lock mReadLock = mReadWriteLock.readLock(); + private final Lock mWriteLock = mReadWriteLock.writeLock(); + + // utility methods to acquire and release read lock and write lock + void readLock() { + mReadLock.lock(); + } + + void readUnlock() { + mReadLock.unlock(); + } + + void writeLock() { + mWriteLock.lock(); + } + + void writeUnlock() { + mWriteLock.unlock(); + } + BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); @@ -135,38 +160,57 @@ class BPOfferService { } return false; } - - synchronized String getBlockPoolId() { - if (bpNSInfo != null) { - return bpNSInfo.getBlockPoolID(); - } else { - LOG.warn("Block pool ID needed, but service not yet registered with NN", - new Exception("trace")); - return null; + + String getBlockPoolId() { + readLock(); + try { + if (bpNSInfo != null) { + return bpNSInfo.getBlockPoolID(); + } else { + LOG.warn("Block pool ID needed, but service not yet registered with NN", + new Exception("trace")); + return null; + } + } finally { + readUnlock(); } } - - synchronized NamespaceInfo getNamespaceInfo() { - return bpNSInfo; + + boolean hasBlockPoolId() { + return getNamespaceInfo() != null; + } + + NamespaceInfo getNamespaceInfo() { + readLock(); + try { + return bpNSInfo; + } finally { + readUnlock(); + } } @Override - public synchronized String toString() { - if (bpNSInfo == null) { - // If we haven't yet connected to our NN, we don't yet know our - // own block pool ID. - // If _none_ of the block pools have connected yet, we don't even - // know the DatanodeID ID of this DN. - String datanodeUuid = dn.getDatanodeUuid(); + public String toString() { + readLock(); + try { + if (bpNSInfo == null) { + // If we haven't yet connected to our NN, we don't yet know our + // own block pool ID. + // If _none_ of the block pools have connected yet, we don't even + // know the DatanodeID ID of this DN. + String datanodeUuid = dn.getDatanodeUuid(); - if (datanodeUuid == null || datanodeUuid.isEmpty()) { - datanodeUuid = "unassigned"; + if (datanodeUuid == null || datanodeUuid.isEmpty()) { + datanodeUuid = "unassigned"; + } + return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; + } else { + return "Block pool " + getBlockPoolId() + + " (Datanode Uuid " + dn.getDatanodeUuid() + + ")"; } - return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; - } else { - return "Block pool " + getBlockPoolId() + - " (Datanode Uuid " + dn.getDatanodeUuid() + - ")"; + } finally { + readUnlock(); } } @@ -262,32 +306,37 @@ class BPOfferService { * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */ - synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { - if (this.bpNSInfo == null) { - this.bpNSInfo = nsInfo; - boolean success = false; - - // Now that we know the namespace ID, etc, we can pass this to the DN. - // The DN can now initialize its local storage if we are the - // first BP to handshake, etc. - try { - dn.initBlockPool(this); - success = true; - } finally { - if (!success) { - // The datanode failed to initialize the BP. We need to reset - // the namespace info so that other BPService actors still have - // a chance to set it, and re-initialize the datanode. - this.bpNSInfo = null; + void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { + writeLock(); + try { + if (this.bpNSInfo == null) { + this.bpNSInfo = nsInfo; + boolean success = false; + + // Now that we know the namespace ID, etc, we can pass this to the DN. + // The DN can now initialize its local storage if we are the + // first BP to handshake, etc. + try { + dn.initBlockPool(this); + success = true; + } finally { + if (!success) { + // The datanode failed to initialize the BP. We need to reset + // the namespace info so that other BPService actors still have + // a chance to set it, and re-initialize the datanode. + this.bpNSInfo = null; + } } + } else { + checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), + "Blockpool ID"); + checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), + "Namespace ID"); + checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), + "Cluster ID"); } - } else { - checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), - "Blockpool ID"); - checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), - "Namespace ID"); - checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), - "Cluster ID"); + } finally { + writeUnlock(); } } @@ -296,22 +345,27 @@ class BPOfferService { * NN, it calls this function to verify that the NN it connected to * is consistent with other NNs serving the block-pool. */ - synchronized void registrationSucceeded(BPServiceActor bpServiceActor, + void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException { - if (bpRegistration != null) { - checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), - reg.getStorageInfo().getNamespaceID(), "namespace ID"); - checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), - reg.getStorageInfo().getClusterID(), "cluster ID"); - } else { - bpRegistration = reg; - } - - dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); - // Add the initial block token secret keys to the DN's secret manager. - if (dn.isBlockTokenEnabled) { - dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), - reg.getExportedKeys()); + writeLock(); + try { + if (bpRegistration != null) { + checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), + reg.getStorageInfo().getNamespaceID(), "namespace ID"); + checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), + reg.getStorageInfo().getClusterID(), "cluster ID"); + } else { + bpRegistration = reg; + } + + dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); + // Add the initial block token secret keys to the DN's secret manager. + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), + reg.getExportedKeys()); + } + } finally { + writeUnlock(); } } @@ -329,25 +383,35 @@ class BPOfferService { } } - synchronized DatanodeRegistration createRegistration() { - Preconditions.checkState(bpNSInfo != null, - "getRegistration() can only be called after initial handshake"); - return dn.createBPRegistration(bpNSInfo); + DatanodeRegistration createRegistration() { + writeLock(); + try { + Preconditions.checkState(bpNSInfo != null, + "getRegistration() can only be called after initial handshake"); + return dn.createBPRegistration(bpNSInfo); + } finally { + writeUnlock(); + } } /** * Called when an actor shuts down. If this is the last actor * to shut down, shuts down the whole blockpool in the DN. */ - synchronized void shutdownActor(BPServiceActor actor) { - if (bpServiceToActive == actor) { - bpServiceToActive = null; - } + void shutdownActor(BPServiceActor actor) { + writeLock(); + try { + if (bpServiceToActive == actor) { + bpServiceToActive = null; + } - bpServices.remove(actor); + bpServices.remove(actor); - if (bpServices.isEmpty()) { - dn.shutdownBlockPool(this); + if (bpServices.isEmpty()) { + dn.shutdownBlockPool(this); + } + } finally { + writeUnlock(); } } @@ -388,11 +452,16 @@ class BPOfferService { * @return a proxy to the active NN, or null if the BPOS has not * acknowledged any NN as active yet. */ - synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() { - if (bpServiceToActive != null) { - return bpServiceToActive.bpNamenode; - } else { - return null; + DatanodeProtocolClientSideTranslatorPB getActiveNN() { + readLock(); + try { + if (bpServiceToActive != null) { + return bpServiceToActive.bpNamenode; + } else { + return null; + } + } finally { + readUnlock(); } } @@ -420,45 +489,50 @@ class BPOfferService { * @param actor the actor which received the heartbeat * @param nnHaState the HA-related heartbeat contents */ - synchronized void updateActorStatesFromHeartbeat( + void updateActorStatesFromHeartbeat( BPServiceActor actor, NNHAStatusHeartbeat nnHaState) { - final long txid = nnHaState.getTxId(); - - final boolean nnClaimsActive = - nnHaState.getState() == HAServiceState.ACTIVE; - final boolean bposThinksActive = bpServiceToActive == actor; - final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; - - if (nnClaimsActive && !bposThinksActive) { - LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + - "txid=" + txid); - if (!isMoreRecentClaim) { - // Split-brain scenario - an NN is trying to claim active - // state when a different NN has already claimed it with a higher - // txid. - LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + - txid + " but there was already a more recent claim at txid=" + - lastActiveClaimTxId); - return; - } else { - if (bpServiceToActive == null) { - LOG.info("Acknowledging ACTIVE Namenode " + actor); + writeLock(); + try { + final long txid = nnHaState.getTxId(); + + final boolean nnClaimsActive = + nnHaState.getState() == HAServiceState.ACTIVE; + final boolean bposThinksActive = bpServiceToActive == actor; + final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; + + if (nnClaimsActive && !bposThinksActive) { + LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + + "txid=" + txid); + if (!isMoreRecentClaim) { + // Split-brain scenario - an NN is trying to claim active + // state when a different NN has already claimed it with a higher + // txid. + LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + + txid + " but there was already a more recent claim at txid=" + + lastActiveClaimTxId); + return; } else { - LOG.info("Namenode " + actor + " taking over ACTIVE state from " + - bpServiceToActive + " at higher txid=" + txid); + if (bpServiceToActive == null) { + LOG.info("Acknowledging ACTIVE Namenode " + actor); + } else { + LOG.info("Namenode " + actor + " taking over ACTIVE state from " + + bpServiceToActive + " at higher txid=" + txid); + } + bpServiceToActive = actor; } - bpServiceToActive = actor; + } else if (!nnClaimsActive && bposThinksActive) { + LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + + "txid=" + nnHaState.getTxId()); + bpServiceToActive = null; } - } else if (!nnClaimsActive && bposThinksActive) { - LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + - "txid=" + nnHaState.getTxId()); - bpServiceToActive = null; - } - - if (bpServiceToActive == actor) { - assert txid >= lastActiveClaimTxId; - lastActiveClaimTxId = txid; + + if (bpServiceToActive == actor) { + assert txid >= lastActiveClaimTxId; + lastActiveClaimTxId = txid; + } + } finally { + writeUnlock(); } } @@ -527,14 +601,17 @@ class BPOfferService { LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr + " with " + actor.state + " state"); actor.reRegister(); - return true; + return false; } - synchronized (this) { + writeLock(); + try { if (actor == bpServiceToActive) { return processCommandFromActive(cmd, actor); } else { return processCommandFromStandby(cmd, actor); } + } finally { + writeUnlock(); } } @@ -571,7 +648,8 @@ class BPOfferService { switch(cmd.getAction()) { case DatanodeProtocol.DNA_TRANSFER: // Send a copy of a block to another datanode - dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets()); + dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), + bcmd.getTargets(), bcmd.getTargetStorageTypes()); dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length); break; case DatanodeProtocol.DNA_INVALIDATE: @@ -679,4 +757,17 @@ class BPOfferService { return true; } + /* + * Let the actor retry for initialization until all namenodes of cluster have + * failed. + */ + boolean shouldRetryInit() { + if (hasBlockPoolId()) { + // One of the namenode registered successfully. lets continue retry for + // other. + return true; + } + return isAlive(); + } + } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Tue Aug 19 23:49:39 2014 @@ -90,8 +90,13 @@ class BPServiceActor implements Runnable Thread bpThread; DatanodeProtocolClientSideTranslatorPB bpNamenode; private volatile long lastHeartbeat = 0; - private volatile boolean initialized = false; - + + static enum RunningState { + CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED; + } + + private volatile RunningState runningState = RunningState.CONNECTING; + /** * Between block reports (which happen on the order of once an hour) the * DN reports smaller incremental changes to its block list. This map, @@ -118,17 +123,12 @@ class BPServiceActor implements Runnable this.dnConf = dn.getDnConf(); } - /** - * returns true if BP thread has completed initialization of storage - * and has registered with the corresponding namenode - * @return true if initialized - */ - boolean isInitialized() { - return initialized; - } - boolean isAlive() { - return shouldServiceRun && bpThread.isAlive(); + if (!shouldServiceRun || !bpThread.isAlive()) { + return false; + } + return runningState == BPServiceActor.RunningState.RUNNING + || runningState == BPServiceActor.RunningState.CONNECTING; } @Override @@ -222,7 +222,19 @@ class BPServiceActor implements Runnable // Second phase of the handshake with the NN. register(); } - + + // This is useful to make sure NN gets Heartbeat before Blockreport + // upon NN restart while DN keeps retrying Otherwise, + // 1. NN restarts. + // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister. + // 3. After reregistration completes, DN will send Blockreport first. + // 4. Given NN receives Blockreport after Heartbeat, it won't mark + // DatanodeStorageInfo#blockContentsStale to false until the next + // Blockreport. + void scheduleHeartbeat() { + lastHeartbeat = 0; + } + /** * This methods arranges for the data node to send the block report at * the next heartbeat. @@ -314,9 +326,7 @@ class BPServiceActor implements Runnable } /** - * Retrieve the incremental BR state for a given storage UUID - * @param storageUuid - * @return + * @return pending incremental block report for given {@code storage} */ private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage( DatanodeStorage storage) { @@ -339,8 +349,6 @@ class BPServiceActor implements Runnable * exists for the same block it is removed. * * Caller must synchronize access using pendingIncrementalBRperStorage. - * @param bInfo - * @param storageUuid */ void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo, DatanodeStorage storage) { @@ -809,19 +817,30 @@ class BPServiceActor implements Runnable LOG.info(this + " starting to offer service"); try { - // init stuff - try { - // setup storage - connectToNNAndHandshake(); - } catch (IOException ioe) { - // Initial handshake, storage recovery or registration failed - // End BPOfferService thread - LOG.fatal("Initialization failed for block pool " + this, ioe); - return; + while (true) { + // init stuff + try { + // setup storage + connectToNNAndHandshake(); + break; + } catch (IOException ioe) { + // Initial handshake, storage recovery or registration failed + runningState = RunningState.INIT_FAILED; + if (shouldRetryInit()) { + // Retry until all namenode's of BPOS failed initialization + LOG.error("Initialization failed for " + this + " " + + ioe.getLocalizedMessage()); + sleepAndLogInterrupts(5000, "initializing"); + } else { + runningState = RunningState.FAILED; + LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe); + return; + } + } } - initialized = true; // bp is initialized; - + runningState = RunningState.RUNNING; + while (shouldRun()) { try { offerService(); @@ -830,14 +849,20 @@ class BPServiceActor implements Runnable sleepAndLogInterrupts(5000, "offering service"); } } + runningState = RunningState.EXITED; } catch (Throwable ex) { LOG.warn("Unexpected exception in block pool " + this, ex); + runningState = RunningState.FAILED; } finally { LOG.warn("Ending block pool service for: " + this); cleanUp(); } } + private boolean shouldRetryInit() { + return shouldRun() && bpos.shouldRetryInit(); + } + private boolean shouldRun() { return shouldServiceRun && dn.shouldRun(); } @@ -889,6 +914,7 @@ class BPServiceActor implements Runnable retrieveNamespaceInfo(); // and re-register register(); + scheduleHeartbeat(); } }