Author: szetszwo Date: Wed Jul 23 17:25:06 2014 New Revision: 1612880 URL: http://svn.apache.org/r1612880 Log: HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage types are unavailable.
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1612880&r1=1612879&r2=1612880&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jul 23 17:25:06 2014 @@ -17,6 +17,9 @@ HDFS-6584: Archival Storage HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files. (vinayakumarb via szetszwo) + HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage + types are unavailable. (szetszwo) + Trunk (Unreleased) INCOMPATIBLE CHANGES Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1612880&r1=1612879&r2=1612880&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Jul 23 17:25:06 2014 @@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Tim import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -218,7 +219,8 @@ public class BlockPlacementPolicyDefault boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); if (!returnChosenNodes) { results.removeAll(chosenStorage); } @@ -238,7 +240,40 @@ public class BlockPlacementPolicyDefault int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; return new int[] {numOfReplicas, maxNodesPerRack}; } - + + private static List<StorageType> selectStorageTypes( + final BlockStoragePolicy storagePolicy, + final short replication, + final Iterable<StorageType> chosen, + final EnumSet<StorageType> unavailableStorages, + final boolean isNewBlock) { + final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes( + replication, chosen); + final List<StorageType> removed = new ArrayList<StorageType>(); + for(int i = storageTypes.size() - 1; i >= 0; i--) { + // replace/remove unavailable storage types. + final StorageType t = storageTypes.get(i); + if (unavailableStorages.contains(t)) { + final StorageType fallback = isNewBlock? + storagePolicy.getCreationFallback(unavailableStorages) + : storagePolicy.getReplicationFallback(unavailableStorages); + if (fallback == null) { + removed.add(storageTypes.remove(i)); + } else { + storageTypes.set(i, fallback); + } + } + } + if (storageTypes.size() < replication) { + LOG.warn("Failed to place enough replicas: replication is " + replication + + " but only " + storageTypes.size() + " storage types can be selected " + + "(selected=" + storageTypes + + ", unavailable=" + unavailableStorages + + ", removed=" + removed + + ", policy=" + storagePolicy + ")"); + } + return storageTypes; + } /** * choose <i>numOfReplicas</i> from all data nodes * @param numOfReplicas additional number of replicas wanted @@ -257,14 +292,14 @@ public class BlockPlacementPolicyDefault final int maxNodesPerRack, final List<DatanodeStorageInfo> results, final boolean avoidStaleNodes, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + final EnumSet<StorageType> unavailableStorages, + final boolean newBlock) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } - int totalReplicasExpected = numOfReplicas + results.size(); - - int numOfResults = results.size(); - boolean newBlock = (numOfResults==0); + final int numOfResults = results.size(); + final int totalReplicasExpected = numOfReplicas + numOfResults; if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) { writer = results.get(0).getDatanodeDescriptor(); } @@ -272,12 +307,25 @@ public class BlockPlacementPolicyDefault // Keep a copy of original excludedNodes final Set<Node> oldExcludedNodes = avoidStaleNodes ? new HashSet<Node>(excludedNodes) : null; - final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes( - (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results)); + + // choose storage types; use fallbacks for unavailable storages + final List<StorageType> storageTypes = selectStorageTypes(storagePolicy, + (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results), + unavailableStorages, newBlock); + + StorageType curStorageType = null; try { + if ((numOfReplicas = storageTypes.size()) == 0) { + throw new NotEnoughReplicasException( + "All required storage types are unavailable: " + + " unavailableStorages=" + unavailableStorages + + ", storagePolicy=" + storagePolicy); + } + if (numOfResults == 0) { + curStorageType = storageTypes.remove(0); writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true) + maxNodesPerRack, results, avoidStaleNodes, curStorageType, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -285,30 +333,33 @@ public class BlockPlacementPolicyDefault } final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) { + curStorageType = storageTypes.remove(0); chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); if (--numOfReplicas == 0) { return writer; } } if (numOfResults <= 2) { final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); + curStorageType = storageTypes.remove(0); if (clusterMap.isOnSameRack(dn0, dn1)) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } else if (newBlock){ chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } if (--numOfReplicas == 0) { return writer; } } + curStorageType = storageTypes.remove(0); chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0)); + maxNodesPerRack, results, avoidStaleNodes, curStorageType); } catch (NotEnoughReplicasException e) { final String message = "Failed to place enough replicas, still in need of " + (totalReplicasExpected - results.size()) + " to reach " @@ -333,7 +384,16 @@ public class BlockPlacementPolicyDefault // if the NotEnoughReplicasException was thrown in chooseRandom(). numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, - maxNodesPerRack, results, false, storagePolicy); + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); + } + + if (storageTypes.size() > 0) { + // Retry chooseTarget with fallback storage types + unavailableStorages.add(curStorageType); + return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); } } return writer;