Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Thu Aug 7 07:38:23 2014 @@ -33,6 +33,9 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; import java.io.IOException; import java.io.PrintStream; @@ -1531,16 +1534,38 @@ public class DFSUtil { .needsClientAuth( sslConf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY, DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT)) - .keyPassword(sslConf.get("ssl.server.keystore.keypassword")) + .keyPassword(getPassword(sslConf, DFS_SERVER_HTTPS_KEYPASSWORD_KEY)) .keyStore(sslConf.get("ssl.server.keystore.location"), - sslConf.get("ssl.server.keystore.password"), + getPassword(sslConf, DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY), sslConf.get("ssl.server.keystore.type", "jks")) .trustStore(sslConf.get("ssl.server.truststore.location"), - sslConf.get("ssl.server.truststore.password"), + getPassword(sslConf, DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY), sslConf.get("ssl.server.truststore.type", "jks")); } /** + * Leverages the Configuration.getPassword method to attempt to get + * passwords from the CredentialProvider API before falling back to + * clear text in config - if falling back is allowed. + * @param conf Configuration instance + * @param alias name of the credential to retreive + * @return String credential value or null + */ + static String getPassword(Configuration conf, String alias) { + String password = null; + try { + char[] passchars = conf.getPassword(alias); + if (passchars != null) { + password = new String(passchars); + } + } + catch (IOException ioe) { + password = null; + } + return password; + } + + /** * Converts a Date into an ISO-8601 formatted datetime string. */ public static String dateToIso8601String(Date date) {
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Thu Aug 7 07:38:23 2014 @@ -50,6 +50,9 @@ public class Block implements Writable, public static final Pattern metaFilePattern = Pattern .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION + "$"); + public static final Pattern metaOrBlockFilePattern = Pattern + .compile(BLOCK_FILE_PREFIX + "(-??\\d++)(_(\\d++)\\" + METADATA_EXTENSION + + ")?$"); public static boolean isBlockFilename(File f) { String name = f.getName(); @@ -65,6 +68,11 @@ public class Block implements Writable, return metaFilePattern.matcher(name).matches(); } + public static File metaToBlockFile(File metaFile) { + return new File(metaFile.getParent(), metaFile.getName().substring( + 0, metaFile.getName().lastIndexOf('_'))); + } + /** * Get generation stamp from the name of the metafile name */ @@ -75,10 +83,10 @@ public class Block implements Writable, } /** - * Get the blockId from the name of the metafile name + * Get the blockId from the name of the meta or block file */ - public static long getBlockId(String metaFile) { - Matcher m = metaFilePattern.matcher(metaFile); + public static long getBlockId(String metaOrBlockFile) { + Matcher m = metaOrBlockFilePattern.matcher(metaOrBlockFile); return m.matches() ? Long.parseLong(m.group(1)) : 0; } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Aug 7 07:38:23 2014 @@ -58,6 +58,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -85,7 +86,6 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.net.Node; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; @@ -195,10 +195,12 @@ import com.google.common.base.Preconditi @InterfaceAudience.Private public class Balancer { static final Log LOG = LogFactory.getLog(Balancer.class); - final private static long GB = 1L << 30; //1GB - final private static long MAX_SIZE_TO_MOVE = 10*GB; - final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; - private static long WIN_WIDTH = 5400*1000L; // 1.5 hour + + private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + + private static final long GB = 1L << 30; //1GB + private static final long MAX_SIZE_TO_MOVE = 10*GB; + private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; /** The maximum number of concurrent blocks moves for * balancing purpose at a datanode @@ -219,6 +221,8 @@ public class Balancer { + "\tIncludes only the specified datanodes."; private final NameNodeConnector nnc; + private final KeyManager keyManager; + private final BalancingPolicy policy; private final SaslDataTransferClient saslClient; private final double threshold; @@ -241,7 +245,8 @@ public class Balancer { private final Map<Block, BalancerBlock> globalBlockList = new HashMap<Block, BalancerBlock>(); - private final MovedBlocks movedBlocks = new MovedBlocks(); + private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks; + /** Map (datanodeUuid,storageType -> StorageGroup) */ private final StorageGroupMap storageGroupMap = new StorageGroupMap(); @@ -326,7 +331,7 @@ public class Balancer { if (isGoodBlockCandidate(source, target, block)) { this.block = block; if ( chooseProxySource() ) { - movedBlocks.add(block); + movedBlocks.put(block); if (LOG.isDebugEnabled()) { LOG.debug("Decided to move " + this); } @@ -399,10 +404,10 @@ public class Balancer { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); - ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); - Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); + ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); + Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, nnc, accessToken, target.getDatanode()); + unbufIn, keyManager, accessToken, target.getDatanode()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, @@ -483,47 +488,9 @@ public class Balancer { } /* A class for keeping track of blocks in the Balancer */ - static private class BalancerBlock { - private final Block block; // the block - /** The locations of the replicas of the block. */ - private final List<BalancerDatanode.StorageGroup> locations - = new ArrayList<BalancerDatanode.StorageGroup>(3); - - /* Constructor */ - private BalancerBlock(Block block) { - this.block = block; - } - - /* clean block locations */ - private synchronized void clearLocations() { - locations.clear(); - } - - /* add a location */ - private synchronized void addLocation(BalancerDatanode.StorageGroup g) { - if (!locations.contains(g)) { - locations.add(g); - } - } - - /** @return if the block is located on the given storage group. */ - private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) { - return locations.contains(g); - } - - /* Return its locations */ - private synchronized List<BalancerDatanode.StorageGroup> getLocations() { - return locations; - } - - /* Return the block */ - private Block getBlock() { - return block; - } - - /* Return the length of the block */ - private long getNumBytes() { - return block.getNumBytes(); + static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> { + BalancerBlock(Block block) { + super(block); } } @@ -735,7 +702,7 @@ public class Balancer { */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks( + final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks( getDatanode(), size).getBlocks(); long bytesReceived = 0; @@ -819,7 +786,7 @@ public class Balancer { private void filterMovedBlocks() { for (Iterator<BalancerBlock> blocks=getBlockIterator(); blocks.hasNext();) { - if (movedBlocks.contains(blocks.next())) { + if (movedBlocks.contains(blocks.next().getBlock())) { blocks.remove(); } } @@ -925,6 +892,13 @@ public class Balancer { this.nodesToBeExcluded = p.nodesToBeExcluded; this.nodesToBeIncluded = p.nodesToBeIncluded; this.nnc = theblockpool; + this.keyManager = nnc.getKeyManager(); + + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth); + cluster = NetworkTopology.getInstance(conf); this.moverExecutor = Executors.newFixedThreadPool( @@ -1094,36 +1068,6 @@ public class Balancer { LOG.info(items.size() + " " + name + ": " + items); } - /** A matcher interface for matching nodes. */ - private interface Matcher { - /** Given the cluster topology, does the left node match the right node? */ - boolean match(NetworkTopology cluster, Node left, Node right); - } - - /** Match datanodes in the same node group. */ - static final Matcher SAME_NODE_GROUP = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return cluster.isOnSameNodeGroup(left, right); - } - }; - - /** Match datanodes in the same rack. */ - static final Matcher SAME_RACK = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return cluster.isOnSameRack(left, right); - } - }; - - /** Match any datanode with any other datanode. */ - static final Matcher ANY_OTHER = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return left != right; - } - }; - /** * Decide all <source, target> pairs and * the number of bytes to move from a source to a target @@ -1134,13 +1078,13 @@ public class Balancer { private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware if (cluster.isNodeGroupAware()) { - chooseStorageGroups(SAME_NODE_GROUP); + chooseStorageGroups(Matcher.SAME_NODE_GROUP); } // Then, match nodes on the same rack - chooseStorageGroups(SAME_RACK); + chooseStorageGroups(Matcher.SAME_RACK); // At last, match all remaining nodes - chooseStorageGroups(ANY_OTHER); + chooseStorageGroups(Matcher.ANY_OTHER); Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), "Mismatched number of datanodes (" + storageGroupMap.size() + " < " @@ -1307,56 +1251,6 @@ public class Balancer { } while (shouldWait); } - /** This window makes sure to keep blocks that have been moved within 1.5 hour. - * Old window has blocks that are older; - * Current window has blocks that are more recent; - * Cleanup method triggers the check if blocks in the old window are - * more than 1.5 hour old. If yes, purge the old window and then - * move blocks in current window to old window. - */ - private static class MovedBlocks { - private long lastCleanupTime = Time.now(); - final private static int CUR_WIN = 0; - final private static int OLD_WIN = 1; - final private static int NUM_WINS = 2; - final private List<HashMap<Block, BalancerBlock>> movedBlocks = - new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS); - - /* initialize the moved blocks collection */ - private MovedBlocks() { - movedBlocks.add(new HashMap<Block,BalancerBlock>()); - movedBlocks.add(new HashMap<Block,BalancerBlock>()); - } - - /* add a block thus marking a block to be moved */ - synchronized private void add(BalancerBlock block) { - movedBlocks.get(CUR_WIN).put(block.getBlock(), block); - } - - /* check if a block is marked as moved */ - synchronized private boolean contains(BalancerBlock block) { - return contains(block.getBlock()); - } - - /* check if a block is marked as moved */ - synchronized private boolean contains(Block block) { - return movedBlocks.get(CUR_WIN).containsKey(block) || - movedBlocks.get(OLD_WIN).containsKey(block); - } - - /* remove old blocks */ - synchronized private void cleanup() { - long curTime = Time.now(); - // check if old win is older than winWidth - if (lastCleanupTime + WIN_WIDTH <= curTime) { - // purge the old window - movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN)); - movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>()); - lastCleanupTime = curTime; - } - } - } - /* Decide if it is OK to move the given block from source to target * A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; @@ -1369,7 +1263,7 @@ public class Balancer { return false; } // check if the block is moved or not - if (movedBlocks.contains(block)) { + if (movedBlocks.contains(block.getBlock())) { return false; } if (block.isLocatedOn(target)) { @@ -1387,7 +1281,7 @@ public class Balancer { } else { boolean notOnSameRack = true; synchronized (block) { - for (BalancerDatanode.StorageGroup loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { notOnSameRack = false; break; @@ -1399,7 +1293,7 @@ public class Balancer { goodBlock = true; } else { // good if source is on the same rack as on of the replicas - for (BalancerDatanode.StorageGroup loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (loc != source && cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { goodBlock = true; @@ -1425,7 +1319,7 @@ public class Balancer { private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, BalancerBlock block, Source source) { final DatanodeInfo targetDn = target.getDatanode(); - for (BalancerDatanode.StorageGroup loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (loc != source && cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { return true; @@ -1489,7 +1383,7 @@ public class Balancer { * decide the number of bytes need to be moved */ final long bytesLeftToMove = init( - nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE)); + nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE)); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); return ReturnStatus.SUCCESS; @@ -1558,8 +1452,8 @@ public class Balancer { final long sleeptime = 2000*conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); - LOG.info("namenodes = " + namenodes); - LOG.info("p = " + p); + LOG.info("namenodes = " + namenodes); + LOG.info("parameters = " + p); final Formatter formatter = new Formatter(System.out); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); @@ -1568,7 +1462,10 @@ public class Balancer { = new ArrayList<NameNodeConnector>(namenodes.size()); try { for (URI uri : namenodes) { - connectors.add(new NameNodeConnector(uri, conf)); + final NameNodeConnector nnc = new NameNodeConnector( + Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); } boolean done = false; @@ -1730,9 +1627,6 @@ public class Balancer { public int run(String[] args) { final long startTime = Time.now(); final Configuration conf = getConf(); - WIN_WIDTH = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); try { checkReplicationPolicyCompatibility(conf); @@ -1761,9 +1655,9 @@ public class Balancer { if (args != null) { try { for(int i = 0; i < args.length; i++) { - checkArgument(args.length >= 2, "args = " + Arrays.toString(args)); if ("-threshold".equalsIgnoreCase(args[i])) { - i++; + checkArgument(++i < args.length, + "Threshold value is missing: args = " + Arrays.toString(args)); try { threshold = Double.parseDouble(args[i]); if (threshold < 1 || threshold > 100) { @@ -1778,7 +1672,8 @@ public class Balancer { throw e; } } else if ("-policy".equalsIgnoreCase(args[i])) { - i++; + checkArgument(++i < args.length, + "Policy value is missing: args = " + Arrays.toString(args)); try { policy = BalancingPolicy.parse(args[i]); } catch(IllegalArgumentException e) { @@ -1786,16 +1681,26 @@ public class Balancer { throw e; } } else if ("-exclude".equalsIgnoreCase(args[i])) { - i++; + checkArgument(++i < args.length, + "List of nodes to exclude | -f <filename> is missing: args = " + + Arrays.toString(args)); if ("-f".equalsIgnoreCase(args[i])) { - nodesTobeExcluded = Util.getHostListFromFile(args[++i]); + checkArgument(++i < args.length, + "File containing nodes to exclude is not specified: args = " + + Arrays.toString(args)); + nodesTobeExcluded = Util.getHostListFromFile(args[i]); } else { nodesTobeExcluded = Util.parseHostList(args[i]); } } else if ("-include".equalsIgnoreCase(args[i])) { - i++; + checkArgument(++i < args.length, + "List of nodes to include | -f <filename> is missing: args = " + + Arrays.toString(args)); if ("-f".equalsIgnoreCase(args[i])) { - nodesTobeIncluded = Util.getHostListFromFile(args[++i]); + checkArgument(++i < args.length, + "File containing nodes to include is not specified: args = " + + Arrays.toString(args)); + nodesTobeIncluded = Util.getHostListFromFile(args[i]); } else { nodesTobeIncluded = Util.parseHostList(args[i]); } @@ -1804,12 +1709,8 @@ public class Balancer { + Arrays.toString(args)); } } - if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) { - System.err.println( - "-exclude and -include options cannot be specified together."); - throw new IllegalArgumentException( - "-exclude and -include options cannot be specified together."); - } + checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(), + "-exclude and -include options cannot be specified together."); } catch(RuntimeException e) { printUsage(System.err); throw e; Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Thu Aug 7 07:38:23 2014 @@ -17,113 +17,96 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import java.io.Closeable; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; -import java.util.EnumSet; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; /** - * The class provides utilities for {@link Balancer} to access a NameNode + * The class provides utilities for accessing a NameNode. */ @InterfaceAudience.Private -class NameNodeConnector implements DataEncryptionKeyFactory { - private static final Log LOG = Balancer.LOG; - private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); +public class NameNodeConnector implements Closeable { + private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); + private static final int MAX_NOT_CHANGED_ITERATIONS = 5; - final URI nameNodeUri; - final String blockpoolID; + private final URI nameNodeUri; + private final String blockpoolID; + + private final NamenodeProtocol namenode; + private final ClientProtocol client; + private final KeyManager keyManager; + + private final FileSystem fs; + private final Path idPath; + private final OutputStream out; - final NamenodeProtocol namenode; - final ClientProtocol client; - final FileSystem fs; - final OutputStream out; - - private final boolean isBlockTokenEnabled; - private final boolean encryptDataTransfer; - private boolean shouldRun; - private long keyUpdaterInterval; - // used for balancer private int notChangedIterations = 0; - private BlockTokenSecretManager blockTokenSecretManager; - private Daemon keyupdaterthread; // AccessKeyUpdater thread - private DataEncryptionKey encryptionKey; - NameNodeConnector(URI nameNodeUri, + public NameNodeConnector(String name, URI nameNodeUri, Path idPath, Configuration conf) throws IOException { this.nameNodeUri = nameNodeUri; + this.idPath = idPath; - this.namenode = - NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class) - .getProxy(); - this.client = - NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class) - .getProxy(); + this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, + NamenodeProtocol.class).getProxy(); + this.client = NameNodeProxies.createProxy(conf, nameNodeUri, + ClientProtocol.class).getProxy(); this.fs = FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); - final ExportedBlockKeys keys = namenode.getBlockKeys(); - this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" - + blockTokenLifetime / (60 * 1000) + " min(s)"); - String encryptionAlgorithm = conf.get( - DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); - this.blockTokenSecretManager = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, blockpoolID, - encryptionAlgorithm); - this.blockTokenSecretManager.addKeys(keys); - /* - * Balancer should sync its block keys with NN more frequently than NN - * updates its block keys - */ - this.keyUpdaterInterval = blockKeyUpdateInterval / 4; - LOG.info("Balancer will update its block keys every " - + keyUpdaterInterval / (60 * 1000) + " minute(s)"); - this.keyupdaterthread = new Daemon(new BlockKeyUpdater()); - this.shouldRun = true; - this.keyupdaterthread.start(); - } - this.encryptDataTransfer = fs.getServerDefaults(new Path("/")) - .getEncryptDataTransfer(); - // Check if there is another balancer running. + final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); + this.keyManager = new KeyManager(blockpoolID, namenode, + defaults.getEncryptDataTransfer(), conf); // Exit if there is another one running. - out = checkAndMarkRunningBalancer(); + out = checkAndMarkRunning(); if (out == null) { - throw new IOException("Another balancer is running"); + throw new IOException("Another " + name + " is running."); } } - boolean shouldContinue(long dispatchBlockMoveBytes) { + /** @return the block pool ID */ + public String getBlockpoolID() { + return blockpoolID; + } + + /** @return the namenode proxy. */ + public NamenodeProtocol getNamenode() { + return namenode; + } + + /** @return the client proxy. */ + public ClientProtocol getClient() { + return client; + } + + /** @return the key manager */ + public KeyManager getKeyManager() { + return keyManager; + } + + /** Should the instance continue running? */ + public boolean shouldContinue(long dispatchBlockMoveBytes) { if (dispatchBlockMoveBytes > 0) { notChangedIterations = 0; } else { @@ -137,53 +120,25 @@ class NameNodeConnector implements DataE return true; } - /** Get an access token for a block. */ - Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb - ) throws IOException { - if (!isBlockTokenEnabled) { - return BlockTokenSecretManager.DUMMY_TOKEN; - } else { - if (!shouldRun) { - throw new IOException( - "Can not get access token. BlockKeyUpdater is not running"); - } - return blockTokenSecretManager.generateToken(null, eb, - EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, - BlockTokenSecretManager.AccessMode.COPY)); - } - } - - @Override - public DataEncryptionKey newDataEncryptionKey() { - if (encryptDataTransfer) { - synchronized (this) { - if (encryptionKey == null) { - encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); - } - return encryptionKey; - } - } else { - return null; - } - } - /* The idea for making sure that there is no more than one balancer + /** + * The idea for making sure that there is no more than one instance * running in an HDFS is to create a file in the HDFS, writes the hostname - * of the machine on which the balancer is running to the file, but did not - * close the file until the balancer exits. - * This prevents the second balancer from running because it can not + * of the machine on which the instance is running to the file, but did not + * close the file until it exits. + * + * This prevents the second instance from running because it can not * creates the file while the first one is running. * - * This method checks if there is any running balancer and - * if no, mark yes if no. + * This method checks if there is any running instance. If no, mark yes. * Note that this is an atomic operation. * - * Return null if there is a running balancer; otherwise the output stream - * to the newly created file. + * @return null if there is a running instance; + * otherwise, the output stream to the newly created file. */ - private OutputStream checkAndMarkRunningBalancer() throws IOException { + private OutputStream checkAndMarkRunning() throws IOException { try { - final DataOutputStream out = fs.create(BALANCER_ID_PATH); + final DataOutputStream out = fs.create(idPath); out.writeBytes(InetAddress.getLocalHost().getHostName()); out.flush(); return out; @@ -196,24 +151,17 @@ class NameNodeConnector implements DataE } } - /** Close the connection. */ - void close() { - shouldRun = false; - try { - if (keyupdaterthread != null) { - keyupdaterthread.interrupt(); - } - } catch(Exception e) { - LOG.warn("Exception shutting down access key updater thread", e); - } + @Override + public void close() { + keyManager.close(); // close the output file IOUtils.closeStream(out); if (fs != null) { try { - fs.delete(BALANCER_ID_PATH, true); + fs.delete(idPath, true); } catch(IOException ioe) { - LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe); + LOG.warn("Failed to delete " + idPath, ioe); } } } @@ -221,31 +169,6 @@ class NameNodeConnector implements DataE @Override public String toString() { return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri - + ", id=" + blockpoolID - + "]"; - } - - /** - * Periodically updates access keys. - */ - class BlockKeyUpdater implements Runnable { - @Override - public void run() { - try { - while (shouldRun) { - try { - blockTokenSecretManager.addKeys(namenode.getBlockKeys()); - } catch (IOException e) { - LOG.error("Failed to set keys", e); - } - Thread.sleep(keyUpdaterInterval); - } - } catch (InterruptedException e) { - LOG.debug("InterruptedException in block key updater thread", e); - } catch (Throwable e) { - LOG.error("Exception in block key updater thread", e); - shouldRun = false; - } - } + + ", bpid=" + blockpoolID + "]"; } } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Thu Aug 7 07:38:23 2014 @@ -21,7 +21,6 @@ import java.util.LinkedList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; @@ -254,18 +253,18 @@ public class BlockInfo extends Block imp } /** * Find specified DatanodeStorageInfo. - * @return index or -1 if not found. + * @return DatanodeStorageInfo or null if not found. */ - int findStorageInfo(DatanodeInfo dn) { + DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) { int len = getCapacity(); for(int idx = 0; idx < len; idx++) { DatanodeStorageInfo cur = getStorageInfo(idx); if(cur == null) break; if(cur.getDatanodeDescriptor() == dn) - return idx; + return cur; } - return -1; + return null; } /** Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Aug 7 07:38:23 2014 @@ -1082,6 +1082,7 @@ public class BlockManager { * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt * @param dn Datanode which holds the corrupt replica + * @param storageID if known, null otherwise. * @param reason a textual reason why the block should be marked corrupt, * for logging purposes */ @@ -1098,19 +1099,29 @@ public class BlockManager { + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, - blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), - dn, storageID); - } - private void markBlockAsCorrupt(BlockToMarkCorrupt b, - DatanodeInfo dn, String storageID) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { - throw new IOException("Cannot mark " + b + throw new IOException("Cannot mark " + blk + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } + + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), + storageID == null ? null : node.getStorageInfo(storageID), + node); + } + + /** + * + * @param b + * @param storageInfo storage that contains the block, if known. null otherwise. + * @throws IOException + */ + private void markBlockAsCorrupt(BlockToMarkCorrupt b, + DatanodeStorageInfo storageInfo, + DatanodeDescriptor node) throws IOException { BlockCollection bc = b.corrupted.getBlockCollection(); if (bc == null) { @@ -1121,7 +1132,9 @@ public class BlockManager { } // Add replica to the data-node if it is not already there - node.addBlock(storageID, b.stored); + if (storageInfo != null) { + storageInfo.addBlock(b.stored); + } // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, @@ -1481,6 +1494,8 @@ public class BlockManager { * * @throws IOException * if the number of targets < minimum replication. + * @see BlockPlacementPolicy#chooseTarget(String, int, Node, + * List, boolean, Set, long, StorageType) */ public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -1719,7 +1734,7 @@ public class BlockManager { * @throws IOException */ public boolean processReport(final DatanodeID nodeID, - final DatanodeStorage storage, final String poolId, + final DatanodeStorage storage, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock @@ -1751,9 +1766,9 @@ public class BlockManager { if (storageInfo.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. - processFirstBlockReport(node, storage.getStorageID(), newReport); + processFirstBlockReport(storageInfo, newReport); } else { - processReport(node, storage, newReport); + processReport(storageInfo, newReport); } // Now that we have an up-to-date block report, we know that any @@ -1815,9 +1830,8 @@ public class BlockManager { } } - private void processReport(final DatanodeDescriptor node, - final DatanodeStorage storage, - final BlockListAsLongs report) throws IOException { + private void processReport(final DatanodeStorageInfo storageInfo, + final BlockListAsLongs report) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -1827,19 +1841,20 @@ public class BlockManager { Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); - reportDiff(node, storage, report, + reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storage.getStorageID()); + addStoredBlockUnderConstruction(b, storageInfo); } for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -1853,7 +1868,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storage.getStorageID()); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -1864,16 +1879,16 @@ public class BlockManager { * a toRemove list (since there won't be any). It also silently discards * any invalid blocks, thereby deferring their processing until * the next block report. - * @param node - DatanodeDescriptor of the node that sent the report + * @param storageInfo - DatanodeStorageInfo that sent the report * @param report - the initial block report, to be processed * @throws IOException */ - private void processFirstBlockReport(final DatanodeDescriptor node, - final String storageID, + private void processFirstBlockReport( + final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); - assert (node.getStorageInfo(storageID).numBlocks() == 0); + assert (storageInfo.numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); while(itBR.hasNext()) { @@ -1882,7 +1897,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } @@ -1894,15 +1909,16 @@ public class BlockManager { // If block is corrupt, mark it and continue to next block. BlockUCState ucState = storedBlock.getBlockUCState(); BlockToMarkCorrupt c = checkReplicaCorrupt( - iblk, reportedState, storedBlock, ucState, node); + iblk, reportedState, storedBlock, ucState, + storageInfo.getDatanodeDescriptor()); if (c != null) { if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c, node, storageID); + markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor()); } continue; } @@ -1910,7 +1926,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( - node.getStorageInfo(storageID), iblk, reportedState); + storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 @@ -1923,12 +1939,12 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, node, storageID); + addStoredBlockImmediate(storedBlock, storageInfo); } } } - private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, + private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, Collection<BlockInfo> toAdd, // add to DatanodeDescriptor Collection<Block> toRemove, // remove from DatanodeDescriptor @@ -1936,8 +1952,6 @@ public class BlockManager { Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list - final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID()); - // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); @@ -1954,7 +1968,7 @@ public class BlockManager { while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), + BlockInfo storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list @@ -1991,7 +2005,7 @@ public class BlockManager { * BlockInfoUnderConstruction's list of replicas.</li> * </ol> * - * @param dn descriptor for the datanode that made the report + * @param storageInfo DatanodeStorageInfo that sent the report. * @param block reported block replica * @param reportedState reported replica state * @param toAdd add to DatanodeDescriptor @@ -2003,14 +2017,16 @@ public class BlockManager { * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfo processReportedBlock(final DatanodeDescriptor dn, - final String storageID, + private BlockInfo processReportedBlock( + final DatanodeStorageInfo storageInfo, final Block block, final ReplicaState reportedState, final Collection<BlockInfo> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC) { + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + if(LOG.isDebugEnabled()) { LOG.debug("Reported block " + block + " on " + dn + " size " + block.getNumBytes() @@ -2019,7 +2035,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(dn, storageID, block, reportedState, + queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } @@ -2059,7 +2075,7 @@ public class BlockManager { // TODO: Pretty confident this should be s/storedBlock/block below, // since we should be postponing the info of the reported block, not // the stored block. See HDFS-6289 for more context. - queueReportedBlock(dn, storageID, storedBlock, reportedState, + queueReportedBlock(storageInfo, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { toCorrupt.add(c); @@ -2088,17 +2104,17 @@ public class BlockManager { * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ - private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, String reason) { assert shouldPostponeBlocksFromFuture; if (LOG.isDebugEnabled()) { LOG.debug("Queueing reported block " + block + " in state " + reportedState + - " from datanode " + dn + " for later processing " + - "because " + reason + "."); + " from datanode " + storageInfo.getDatanodeDescriptor() + + " for later processing because " + reason + "."); } - pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); + pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState); } /** @@ -2121,7 +2137,7 @@ public class BlockManager { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } - processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), + processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } } @@ -2178,6 +2194,16 @@ public class BlockManager { } else { return null; // not corrupt } + case UNDER_CONSTRUCTION: + if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { + final long reportedGS = reported.getGenerationStamp(); + return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + + ucState + " and reported state " + reportedState + + ", But reported genstamp " + reportedGS + + " does not match genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + } + return null; default: return null; } @@ -2241,19 +2267,20 @@ public class BlockManager { } void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, - DatanodeDescriptor node, String storageID) throws IOException { + DatanodeStorageInfo storageInfo) throws IOException { BlockInfoUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent(node.getStorageInfo(storageID), - ucBlock.reportedBlock, ucBlock.reportedState); + block.addReplicaIfNotPresent( + storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); - if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { - addStoredBlock(block, node, storageID, null, true); + if (ucBlock.reportedState == ReplicaState.FINALIZED && + block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) { + addStoredBlock(block, storageInfo, null, true); } } /** * Faster version of - * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} + * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)} * , intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from @@ -2264,17 +2291,17 @@ public class BlockManager { * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, - DatanodeDescriptor node, String storageID) + DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, node, storageID, null, false); + addStoredBlock(storedBlock, storageInfo, null, false); return; } // just add it - node.addBlock(storageID, storedBlock); + storageInfo.addBlock(storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2296,13 +2323,13 @@ public class BlockManager { * @return the block that is stored in blockMap. */ private Block addStoredBlock(final BlockInfo block, - DatanodeDescriptor node, - String storageID, + DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoUnderConstruction) { //refresh our copy in case the block got completed in another thread storedBlock = blocksMap.getStoredBlock(block); @@ -2322,7 +2349,7 @@ public class BlockManager { assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = node.addBlock(storageID, storedBlock); + boolean added = storageInfo.addBlock(storedBlock); int curReplicaDelta; if (added) { @@ -2872,8 +2899,9 @@ public class BlockManager { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) + void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) throws IOException { + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. @@ -2893,12 +2921,12 @@ public class BlockManager { // Modify the blocks->datanode map and node's map. // pendingReplications.decrement(block, node); - processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, + processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - private void processAndHandleReportedBlock(DatanodeDescriptor node, - String storageID, Block block, + private void processAndHandleReportedBlock( + DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block @@ -2906,7 +2934,9 @@ public class BlockManager { Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); - processReportedBlock(node, storageID, block, reportedState, + final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); + + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2914,11 +2944,11 @@ public class BlockManager { : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storageID); + addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2932,7 +2962,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storageID); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -2959,13 +2989,15 @@ public class BlockManager { "Got incremental block report from unregistered or dead node"); } - if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) { + DatanodeStorageInfo storageInfo = + node.getStorageInfo(srdb.getStorage().getStorageID()); + if (storageInfo == null) { // The DataNode is reporting an unknown storage. Usually the NN learns // about new storages from heartbeats but during NN restart we may // receive a block report or incremental report before the heartbeat. // We must handle this for protocol compatibility. This issue was // uncovered by HDFS-6094. - node.updateStorage(srdb.getStorage()); + storageInfo = node.updateStorage(srdb.getStorage()); } for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { @@ -2975,14 +3007,13 @@ public class BlockManager { deleted++; break; case RECEIVED_BLOCK: - addBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), rdbi.getDelHints()); + addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints()); received++; break; case RECEIVING_BLOCK: receiving++; - processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), ReplicaState.RBW, null); + processAndHandleReportedBlock(storageInfo, rdbi.getBlock(), + ReplicaState.RBW, null); break; default: String msg = @@ -3174,6 +3205,15 @@ public class BlockManager { } } } + + if (!status && !srcNode.isAlive) { + LOG.warn("srcNode " + srcNode + " is dead " + + "when decommission is in progress. Continue to mark " + + "it as decommission in progress. In that way, when it rejoins the " + + "cluster it can continue the decommission process."); + status = true; + } + srcNode.decommissioningStatus.set(underReplicatedBlocks, decommissionOnlyReplicas, underReplicatedInOpenFiles); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Thu Aug 7 07:38:23 2014 @@ -23,8 +23,8 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; -import org.apache.hadoop.util.LightWeightGSet.SetIterator; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -217,9 +217,14 @@ class BlocksMap { BlockInfo currentBlock = blocks.get(newBlock); assert currentBlock != null : "the block if not in blocksMap"; // replace block in data-node lists - for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) { - DatanodeDescriptor dn = currentBlock.getDatanode(idx); - dn.replaceBlock(currentBlock, newBlock); + for (int i = currentBlock.numNodes() - 1; i >= 0; i--) { + final DatanodeDescriptor dn = currentBlock.getDatanode(i); + final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn); + final boolean removed = storage.removeBlock(currentBlock); + Preconditions.checkState(removed, "currentBlock not found."); + + final boolean added = storage.addBlock(newBlock); + Preconditions.checkState(added, "newBlock already exists."); } // replace block in the map itself blocks.put(newBlock); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Thu Aug 7 07:38:23 2014 @@ -48,18 +48,6 @@ public class CorruptReplicasMap{ private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<Block, Map<DatanodeDescriptor, Reason>>(); - - /** - * Mark the block belonging to datanode as corrupt. - * - * @param blk Block to be added to CorruptReplicasMap - * @param dn DatanodeDescriptor which holds the corrupt replica - * @param reason a textual reason (for logging purposes) - */ - public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, - String reason) { - addToCorruptReplicasMap(blk, dn, reason, Reason.NONE); - } /** * Mark the block belonging to datanode as corrupt. @@ -69,7 +57,7 @@ public class CorruptReplicasMap{ * @param reason a textual reason (for logging purposes) * @param reasonCode the enum representation of the reason */ - public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, + void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, String reason, Reason reasonCode) { Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); if (nodes == null) { @@ -127,7 +115,6 @@ public class CorruptReplicasMap{ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode, Reason reason) { Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk); - boolean removed = false; if (datanodes==null) return false; @@ -174,12 +161,12 @@ public class CorruptReplicasMap{ return ((nodes != null) && (nodes.contains(node))); } - public int numCorruptReplicas(Block blk) { + int numCorruptReplicas(Block blk) { Collection<DatanodeDescriptor> nodes = getNodes(blk); return (nodes == null) ? 0 : nodes.size(); } - public int size() { + int size() { return corruptReplicasMap.size(); } Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Thu Aug 7 07:38:23 2014 @@ -234,18 +234,6 @@ public class DatanodeDescriptor extends updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); } - /** - * Add data-node to the block. Add block to the head of the list of blocks - * belonging to the data-node. - */ - public boolean addBlock(String storageID, BlockInfo b) { - DatanodeStorageInfo s = getStorageInfo(storageID); - if (s != null) { - return s.addBlock(b); - } - return false; - } - @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { @@ -260,8 +248,8 @@ public class DatanodeDescriptor extends } public StorageReport[] getStorageReports() { - final StorageReport[] reports = new StorageReport[storageMap.size()]; final DatanodeStorageInfo[] infos = getStorageInfos(); + final StorageReport[] reports = new StorageReport[infos.length]; for(int i = 0; i < infos.length; i++) { reports[i] = infos[i].toStorageReport(); } @@ -284,13 +272,10 @@ public class DatanodeDescriptor extends * data-node from the block. */ boolean removeBlock(BlockInfo b) { - int index = b.findStorageInfo(this); + final DatanodeStorageInfo s = b.findStorageInfo(this); // if block exists on this datanode - if (index >= 0) { - DatanodeStorageInfo s = b.getStorageInfo(index); - if (s != null) { - return s.removeBlock(b); - } + if (s != null) { + return s.removeBlock(b); } return false; } @@ -307,24 +292,6 @@ public class DatanodeDescriptor extends return false; } - /** - * Replace specified old block with a new one in the DataNodeDescriptor. - * - * @param oldBlock - block to be replaced - * @param newBlock - a replacement block - * @return the new block - */ - public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) { - int index = oldBlock.findStorageInfo(this); - DatanodeStorageInfo s = oldBlock.getStorageInfo(index); - boolean done = s.removeBlock(oldBlock); - assert done : "Old block should belong to the data-node when replacing"; - - done = s.addBlock(newBlock); - assert done : "New block should not belong to the data-node when replacing"; - return newBlock; - } - public void resetBlocks() { setCapacity(0); setRemaining(0); Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Thu Aug 7 07:38:23 2014 @@ -207,7 +207,7 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - boolean addBlock(BlockInfo b) { + public boolean addBlock(BlockInfo b) { if(!b.addStorage(this)) return false; // add to the head of the data-node list Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Thu Aug 7 07:38:23 2014 @@ -23,6 +23,7 @@ import java.util.Queue; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,14 +42,12 @@ class PendingDataNodeMessages { static class ReportedBlockInfo { private final Block block; - private final DatanodeDescriptor dn; - private final String storageID; + private final DatanodeStorageInfo storageInfo; private final ReplicaState reportedState; - ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block, + ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { - this.dn = dn; - this.storageID = storageID; + this.storageInfo = storageInfo; this.block = block; this.reportedState = reportedState; } @@ -57,21 +56,18 @@ class PendingDataNodeMessages { return block; } - DatanodeDescriptor getNode() { - return dn; - } - - String getStorageID() { - return storageID; - } - ReplicaState getReportedState() { return reportedState; } + + DatanodeStorageInfo getStorageInfo() { + return storageInfo; + } @Override public String toString() { - return "ReportedBlockInfo [block=" + block + ", dn=" + dn + return "ReportedBlockInfo [block=" + block + ", dn=" + + storageInfo.getDatanodeDescriptor() + ", reportedState=" + reportedState + "]"; } } @@ -87,7 +83,7 @@ class PendingDataNodeMessages { Queue<ReportedBlockInfo> oldQueue = entry.getValue(); while (!oldQueue.isEmpty()) { ReportedBlockInfo rbi = oldQueue.remove(); - if (!rbi.getNode().equals(dn)) { + if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) { newQueue.add(rbi); } else { count--; @@ -97,11 +93,11 @@ class PendingDataNodeMessages { } } - void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { block = new Block(block); getBlockQueue(block).add( - new ReportedBlockInfo(dn, storageID, block, reportedState)); + new ReportedBlockInfo(storageInfo, block, reportedState)); count++; } @@ -127,7 +123,7 @@ class PendingDataNodeMessages { return queue; } - public int count() { + int count() { return count ; } @@ -144,7 +140,7 @@ class PendingDataNodeMessages { return sb.toString(); } - public Iterable<ReportedBlockInfo> takeAll() { + Iterable<ReportedBlockInfo> takeAll() { List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity( count); for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) { Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1616428&r1=1616427&r2=1616428&view=diff ============================================================================== --- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Thu Aug 7 07:38:23 2014 @@ -21,6 +21,7 @@ import com.google.common.annotations.Vis import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -38,6 +39,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * One instance per block-pool/namespace on the DN, which handles the @@ -91,6 +94,28 @@ class BPOfferService { */ private long lastActiveClaimTxId = -1; + private final ReentrantReadWriteLock mReadWriteLock = + new ReentrantReadWriteLock(); + private final Lock mReadLock = mReadWriteLock.readLock(); + private final Lock mWriteLock = mReadWriteLock.writeLock(); + + // utility methods to acquire and release read lock and write lock + void readLock() { + mReadLock.lock(); + } + + void readUnlock() { + mReadLock.unlock(); + } + + void writeLock() { + mWriteLock.lock(); + } + + void writeUnlock() { + mWriteLock.unlock(); + } + BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); @@ -135,14 +160,19 @@ class BPOfferService { } return false; } - - synchronized String getBlockPoolId() { - if (bpNSInfo != null) { - return bpNSInfo.getBlockPoolID(); - } else { - LOG.warn("Block pool ID needed, but service not yet registered with NN", - new Exception("trace")); - return null; + + String getBlockPoolId() { + readLock(); + try { + if (bpNSInfo != null) { + return bpNSInfo.getBlockPoolID(); + } else { + LOG.warn("Block pool ID needed, but service not yet registered with NN", + new Exception("trace")); + return null; + } + } finally { + readUnlock(); } } @@ -150,27 +180,37 @@ class BPOfferService { return getNamespaceInfo() != null; } - synchronized NamespaceInfo getNamespaceInfo() { - return bpNSInfo; + NamespaceInfo getNamespaceInfo() { + readLock(); + try { + return bpNSInfo; + } finally { + readUnlock(); + } } @Override - public synchronized String toString() { - if (bpNSInfo == null) { - // If we haven't yet connected to our NN, we don't yet know our - // own block pool ID. - // If _none_ of the block pools have connected yet, we don't even - // know the DatanodeID ID of this DN. - String datanodeUuid = dn.getDatanodeUuid(); + public String toString() { + readLock(); + try { + if (bpNSInfo == null) { + // If we haven't yet connected to our NN, we don't yet know our + // own block pool ID. + // If _none_ of the block pools have connected yet, we don't even + // know the DatanodeID ID of this DN. + String datanodeUuid = dn.getDatanodeUuid(); - if (datanodeUuid == null || datanodeUuid.isEmpty()) { - datanodeUuid = "unassigned"; + if (datanodeUuid == null || datanodeUuid.isEmpty()) { + datanodeUuid = "unassigned"; + } + return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; + } else { + return "Block pool " + getBlockPoolId() + + " (Datanode Uuid " + dn.getDatanodeUuid() + + ")"; } - return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; - } else { - return "Block pool " + getBlockPoolId() + - " (Datanode Uuid " + dn.getDatanodeUuid() + - ")"; + } finally { + readUnlock(); } } @@ -266,32 +306,37 @@ class BPOfferService { * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */ - synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { - if (this.bpNSInfo == null) { - this.bpNSInfo = nsInfo; - boolean success = false; - - // Now that we know the namespace ID, etc, we can pass this to the DN. - // The DN can now initialize its local storage if we are the - // first BP to handshake, etc. - try { - dn.initBlockPool(this); - success = true; - } finally { - if (!success) { - // The datanode failed to initialize the BP. We need to reset - // the namespace info so that other BPService actors still have - // a chance to set it, and re-initialize the datanode. - this.bpNSInfo = null; + void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { + writeLock(); + try { + if (this.bpNSInfo == null) { + this.bpNSInfo = nsInfo; + boolean success = false; + + // Now that we know the namespace ID, etc, we can pass this to the DN. + // The DN can now initialize its local storage if we are the + // first BP to handshake, etc. + try { + dn.initBlockPool(this); + success = true; + } finally { + if (!success) { + // The datanode failed to initialize the BP. We need to reset + // the namespace info so that other BPService actors still have + // a chance to set it, and re-initialize the datanode. + this.bpNSInfo = null; + } } + } else { + checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), + "Blockpool ID"); + checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), + "Namespace ID"); + checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), + "Cluster ID"); } - } else { - checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), - "Blockpool ID"); - checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), - "Namespace ID"); - checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), - "Cluster ID"); + } finally { + writeUnlock(); } } @@ -300,22 +345,27 @@ class BPOfferService { * NN, it calls this function to verify that the NN it connected to * is consistent with other NNs serving the block-pool. */ - synchronized void registrationSucceeded(BPServiceActor bpServiceActor, + void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException { - if (bpRegistration != null) { - checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), - reg.getStorageInfo().getNamespaceID(), "namespace ID"); - checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), - reg.getStorageInfo().getClusterID(), "cluster ID"); - } else { - bpRegistration = reg; - } - - dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); - // Add the initial block token secret keys to the DN's secret manager. - if (dn.isBlockTokenEnabled) { - dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), - reg.getExportedKeys()); + writeLock(); + try { + if (bpRegistration != null) { + checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), + reg.getStorageInfo().getNamespaceID(), "namespace ID"); + checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), + reg.getStorageInfo().getClusterID(), "cluster ID"); + } else { + bpRegistration = reg; + } + + dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); + // Add the initial block token secret keys to the DN's secret manager. + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), + reg.getExportedKeys()); + } + } finally { + writeUnlock(); } } @@ -333,25 +383,35 @@ class BPOfferService { } } - synchronized DatanodeRegistration createRegistration() { - Preconditions.checkState(bpNSInfo != null, - "getRegistration() can only be called after initial handshake"); - return dn.createBPRegistration(bpNSInfo); + DatanodeRegistration createRegistration() { + writeLock(); + try { + Preconditions.checkState(bpNSInfo != null, + "getRegistration() can only be called after initial handshake"); + return dn.createBPRegistration(bpNSInfo); + } finally { + writeUnlock(); + } } /** * Called when an actor shuts down. If this is the last actor * to shut down, shuts down the whole blockpool in the DN. */ - synchronized void shutdownActor(BPServiceActor actor) { - if (bpServiceToActive == actor) { - bpServiceToActive = null; - } + void shutdownActor(BPServiceActor actor) { + writeLock(); + try { + if (bpServiceToActive == actor) { + bpServiceToActive = null; + } - bpServices.remove(actor); + bpServices.remove(actor); - if (bpServices.isEmpty()) { - dn.shutdownBlockPool(this); + if (bpServices.isEmpty()) { + dn.shutdownBlockPool(this); + } + } finally { + writeUnlock(); } } @@ -392,11 +452,16 @@ class BPOfferService { * @return a proxy to the active NN, or null if the BPOS has not * acknowledged any NN as active yet. */ - synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() { - if (bpServiceToActive != null) { - return bpServiceToActive.bpNamenode; - } else { - return null; + DatanodeProtocolClientSideTranslatorPB getActiveNN() { + readLock(); + try { + if (bpServiceToActive != null) { + return bpServiceToActive.bpNamenode; + } else { + return null; + } + } finally { + readUnlock(); } } @@ -424,45 +489,50 @@ class BPOfferService { * @param actor the actor which received the heartbeat * @param nnHaState the HA-related heartbeat contents */ - synchronized void updateActorStatesFromHeartbeat( + void updateActorStatesFromHeartbeat( BPServiceActor actor, NNHAStatusHeartbeat nnHaState) { - final long txid = nnHaState.getTxId(); - - final boolean nnClaimsActive = - nnHaState.getState() == HAServiceState.ACTIVE; - final boolean bposThinksActive = bpServiceToActive == actor; - final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; - - if (nnClaimsActive && !bposThinksActive) { - LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + - "txid=" + txid); - if (!isMoreRecentClaim) { - // Split-brain scenario - an NN is trying to claim active - // state when a different NN has already claimed it with a higher - // txid. - LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + - txid + " but there was already a more recent claim at txid=" + - lastActiveClaimTxId); - return; - } else { - if (bpServiceToActive == null) { - LOG.info("Acknowledging ACTIVE Namenode " + actor); + writeLock(); + try { + final long txid = nnHaState.getTxId(); + + final boolean nnClaimsActive = + nnHaState.getState() == HAServiceState.ACTIVE; + final boolean bposThinksActive = bpServiceToActive == actor; + final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; + + if (nnClaimsActive && !bposThinksActive) { + LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + + "txid=" + txid); + if (!isMoreRecentClaim) { + // Split-brain scenario - an NN is trying to claim active + // state when a different NN has already claimed it with a higher + // txid. + LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + + txid + " but there was already a more recent claim at txid=" + + lastActiveClaimTxId); + return; } else { - LOG.info("Namenode " + actor + " taking over ACTIVE state from " + - bpServiceToActive + " at higher txid=" + txid); + if (bpServiceToActive == null) { + LOG.info("Acknowledging ACTIVE Namenode " + actor); + } else { + LOG.info("Namenode " + actor + " taking over ACTIVE state from " + + bpServiceToActive + " at higher txid=" + txid); + } + bpServiceToActive = actor; } - bpServiceToActive = actor; + } else if (!nnClaimsActive && bposThinksActive) { + LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + + "txid=" + nnHaState.getTxId()); + bpServiceToActive = null; } - } else if (!nnClaimsActive && bposThinksActive) { - LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + - "txid=" + nnHaState.getTxId()); - bpServiceToActive = null; - } - - if (bpServiceToActive == actor) { - assert txid >= lastActiveClaimTxId; - lastActiveClaimTxId = txid; + + if (bpServiceToActive == actor) { + assert txid >= lastActiveClaimTxId; + lastActiveClaimTxId = txid; + } + } finally { + writeUnlock(); } } @@ -533,12 +603,15 @@ class BPOfferService { actor.reRegister(); return true; } - synchronized (this) { + writeLock(); + try { if (actor == bpServiceToActive) { return processCommandFromActive(cmd, actor); } else { return processCommandFromStandby(cmd, actor); } + } finally { + writeUnlock(); } }