HDFS-6940. Refactoring to allow ConsensusNode implementation. Contributed by Konstantin Shvachko.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88209ce1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88209ce1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88209ce1 Branch: refs/heads/HDFS-6581 Commit: 88209ce181b5ecc55c0ae2bceff4893ab4817e88 Parents: 3b35f81 Author: Konstantin V Shvachko <s...@apache.org> Authored: Sat Sep 6 12:07:52 2014 -0700 Committer: Konstantin V Shvachko <s...@apache.org> Committed: Sat Sep 6 12:07:52 2014 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../server/blockmanagement/BlockManager.java | 23 ++++++++-- .../server/blockmanagement/DatanodeManager.java | 6 ++- .../server/blockmanagement/HostFileManager.java | 4 ++ .../hdfs/server/namenode/FSNamesystem.java | 46 +++++++++++--------- .../hdfs/server/namenode/NameNodeAdapter.java | 2 +- 6 files changed, 57 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 333bdce..4412b30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -444,6 +444,8 @@ Release 2.6.0 - UNRELEASED HDFS-6376. Distcp data between two HA clusters requires another configuration. (Dave Marion and Haohui Mai via jing9) + HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8470680..6176188 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -164,7 +164,7 @@ public class BlockManager { final BlocksMap blocksMap; /** Replication thread. */ - final Daemon replicationThread = new Daemon(new ReplicationMonitor()); + Daemon replicationThread; /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); @@ -263,6 +263,7 @@ public class BlockManager { this.namesystem = namesystem; datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); + setReplicationMonitor(new ReplicationMonitor()); final long pendingPeriod = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, @@ -394,7 +395,23 @@ public class BlockManager { lifetimeMin*60*1000L, 0, null, encryptionAlgorithm); } } - + + public long getReplicationRecheckInterval() { + return replicationRecheckInterval; + } + + public AtomicLong excessBlocksCount() { + return excessBlocksCount; + } + + public void clearInvalidateBlocks() { + invalidateBlocks.clear(); + } + + void setReplicationMonitor(Runnable replicationMonitor) { + replicationThread = new Daemon(replicationMonitor); + } + public void setBlockPoolId(String blockPoolId) { if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); @@ -1616,7 +1633,7 @@ public class BlockManager { * If there were any replication requests that timed out, reap them * and put them back into the neededReplication queue */ - private void processPendingReplications() { + void processPendingReplications() { Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 709f060..55d616f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1053,7 +1053,7 @@ public class DatanodeManager { * 3. Added to exclude --> start decommission. * 4. Removed from exclude --> stop decommission. */ - private void refreshDatanodes() { + void refreshDatanodes() { for(DatanodeDescriptor node : datanodeMap.values()) { // Check if not include. if (!hostFileManager.isIncluded(node)) { @@ -1586,5 +1586,9 @@ public class DatanodeManager { public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) { this.shouldSendCachingCommands = shouldSendCachingCommands; } + + public HostFileManager getHostFileManager() { + return this.hostFileManager; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java index 0b8d6c5..7db23e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HostFileManager.java @@ -129,6 +129,10 @@ class HostFileManager { void refresh(String includeFile, String excludeFile) throws IOException { HostSet newIncludes = readFile("included", includeFile); HostSet newExcludes = readFile("excluded", excludeFile); + setHosts(newIncludes, newExcludes); + } + + void setHosts(HostSet newIncludes, HostSet newExcludes) { synchronized (this) { includes = newIncludes; excludes = newExcludes; http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index c1744f6..a6b98a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -978,7 +978,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return Collections.unmodifiableList(auditLoggers); } - private void loadFSImage(StartupOption startOpt) throws IOException { + protected void loadFSImage(StartupOption startOpt) throws IOException { final FSImage fsImage = getFSImage(); // format before starting up if requested @@ -1026,7 +1026,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, imageLoadComplete(); } - private void startSecretManager() { + protected void startSecretManager() { if (dtSecretManager != null) { try { dtSecretManager.startThreads(); @@ -1038,7 +1038,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } - private void startSecretManagerIfNecessary() { + protected void startSecretManagerIfNecessary() { boolean shouldRun = shouldUseDelegationTokens() && !isInSafeMode() && getEditLog().isOpenForWrite(); boolean running = dtSecretManager.isRunning(); @@ -1188,7 +1188,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return haEnabled && inActiveState() && startingActiveService; } - private boolean shouldUseDelegationTokens() { + protected boolean shouldUseDelegationTokens() { return UserGroupInformation.isSecurityEnabled() || alwaysUseDelegationTokensForTests; } @@ -2729,6 +2729,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws UnresolvedLinkException * @throws IOException */ + protected LocatedBlock prepareFileForWrite(String src, INodeFile file, String leaseHolder, String clientMachine, boolean writeToEditLog, @@ -3185,6 +3186,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return new FileState(pendingFile, src); } + protected LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs, long offset) throws IOException { LocatedBlock lBlk = new LocatedBlock( @@ -3302,8 +3304,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return true; } - private INodeFile checkLease(String src, String holder, INode inode, - long fileId) + protected INodeFile checkLease(String src, String holder, INode inode, + long fileId) throws LeaseExpiredException, FileNotFoundException { assert hasReadLock(); final String ident = src + " (inode " + fileId + ")"; @@ -4420,7 +4422,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return leaseManager.reassignLease(lease, src, newHolder); } - private void commitOrCompleteLastBlock(final INodeFile fileINode, + protected void commitOrCompleteLastBlock(final INodeFile fileINode, final Block commitBlock) throws IOException { assert hasWriteLock(); Preconditions.checkArgument(fileINode.isUnderConstruction()); @@ -4816,6 +4818,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @return an array of datanode commands * @throws IOException */ + protected HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes) @@ -4865,8 +4868,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @param file * @param logRetryCache */ - private void persistBlocks(String path, INodeFile file, - boolean logRetryCache) { + protected void persistBlocks(String path, INodeFile file, + boolean logRetryCache) { assert hasWriteLock(); Preconditions.checkArgument(file.isUnderConstruction()); getEditLog().logUpdateBlocks(path, file, logRetryCache); @@ -5297,7 +5300,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @param path * @param file */ - private void persistNewBlock(String path, INodeFile file) { + protected void persistNewBlock(String path, INodeFile file) { Preconditions.checkArgument(file.isUnderConstruction()); getEditLog().logAddBlock(path, file); if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -7175,7 +7178,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * * @return true if delegation token operation is allowed */ - private boolean isAllowedDelegationTokenOp() throws IOException { + protected boolean isAllowedDelegationTokenOp() throws IOException { AuthenticationMethod authMethod = getConnectionAuthenticationMethod(); if (UserGroupInformation.isSecurityEnabled() && (authMethod != AuthenticationMethod.KERBEROS) @@ -7342,7 +7345,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); blockManager.getDatanodeManager().fetchDatanodes(live, null, true); for (DatanodeDescriptor node : live) { - Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder() + info.put(node.getHostName(), getLiveNodeInfo(node)); + } + return JSON.toString(info); + } + + protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) { + return ImmutableMap.<String, Object>builder() .put("infoAddr", node.getInfoAddr()) .put("infoSecureAddr", node.getInfoSecureAddr()) .put("xferaddr", node.getXferAddr()) @@ -7360,10 +7369,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats, .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent()) .put("volfails", node.getVolumeFailures()) .build(); - - info.put(node.getHostName(), innerinfo); - } - return JSON.toString(info); } /** @@ -7648,17 +7653,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, public ReentrantLock getLongReadLockForTests() { return fsLock.longReadLock; } - - @VisibleForTesting - public SafeModeInfo getSafeModeInfoForTests() { - return safeMode; - } @VisibleForTesting public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) { this.nnResourceChecker = nnResourceChecker; } + public SafeModeInfo getSafeModeInfo() { + return safeMode; + } + @Override public boolean isAvoidingStaleDataNodesForWrite() { return this.blockManager.getDatanodeManager() http://git-wip-us.apache.org/repos/asf/hadoop/blob/88209ce1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index c32ed67..d65d1ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -223,7 +223,7 @@ public class NameNodeAdapter { * if safemode is not running. */ public static int getSafeModeSafeBlocks(NameNode nn) { - SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests(); + SafeModeInfo smi = nn.getNamesystem().getSafeModeInfo(); if (smi == null) { return -1; }