Author: todd
Date: Fri Nov 18 00:45:31 2011
New Revision: 1203444

URL: http://svn.apache.org/viewvc?rev=1203444&view=rev
Log:
HDFS-2560. Refactor BPOfferService to be a static inner class. Contributed by 
Todd Lipcon.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1203444&r1=1203443&r2=1203444&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Nov 18 
00:45:31 2011
@@ -107,6 +107,7 @@ Release 0.23.1 - UNRELEASED
   NEW FEATURES
 
   IMPROVEMENTS
+    HDFS-2560. Refactor BPOfferService to be a static inner class (todd)
 
   OPTIMIZATIONS
 

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1203444&r1=1203443&r2=1203444&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
 Fri Nov 18 00:45:31 2011
@@ -55,6 +55,10 @@ public class BlockPoolTokenSecretManager
     }
     return secretMgr;
   }
+  
+  public synchronized boolean isBlockPoolRegistered(String bpid) {
+    return map.containsKey(bpid);
+  }
 
   /** Return an empty BlockTokenIdentifer */
   @Override

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1203444&r1=1203443&r2=1203444&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 Fri Nov 18 00:45:31 2011
@@ -136,6 +136,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import 
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
@@ -275,7 +276,7 @@ public class DataNode extends Configured
   
       List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
       for(InetSocketAddress isa : isas) {
-        BPOfferService bpos = new BPOfferService(isa);
+        BPOfferService bpos = new BPOfferService(isa, DataNode.this);
         nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
       }
     }
@@ -373,19 +374,19 @@ public class DataNode extends Configured
           }
 
           for (InetSocketAddress nnaddr : toStart) {
-            BPOfferService bpos = new BPOfferService(nnaddr);
+            BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
             nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
           }
-
-          for (BPOfferService bpos : toShutdown) {
-            remove(bpos);
-          }
         }
 
         for (BPOfferService bpos : toShutdown) {
           bpos.stop();
           bpos.join();
         }
+        
+        // stoping the BPOSes causes them to call remove() on their own when 
they
+        // clean up.
+        
         // Now start the threads that are not already running.
         startAll();
       }
@@ -402,9 +403,7 @@ public class DataNode extends Configured
   Daemon dataXceiverServer = null;
   ThreadGroup threadGroup = null;
   long blockReportInterval;
-  boolean resetBlockReportTime = true;
   long deleteReportInterval;
-  long lastDeletedReport = 0;
   long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
   long heartBeatInterval;
   private boolean heartbeatsDisabledForTests = false;
@@ -653,6 +652,7 @@ public class DataNode extends Configured
       return;
     }
     String reason = null;
+    assert data != null;
     if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
                     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
       reason = "verification is turned off by configuration";
@@ -774,11 +774,15 @@ public class DataNode extends Configured
    * </ul>
    */
   @InterfaceAudience.Private
-  class BPOfferService implements Runnable {
+  static class BPOfferService implements Runnable {
     final InetSocketAddress nnAddr;
     DatanodeRegistration bpRegistration;
     NamespaceInfo bpNSInfo;
     long lastBlockReport = 0;
+    long lastDeletedReport = 0;
+
+    boolean resetBlockReportTime = true;
+
     private Thread bpThread;
     private DatanodeProtocol bpNamenode;
     private String blockPoolId;
@@ -788,14 +792,13 @@ public class DataNode extends Configured
       = new LinkedList<ReceivedDeletedBlockInfo>();
     private volatile int pendingReceivedRequests = 0;
     private volatile boolean shouldServiceRun = true;
-    private boolean isBlockTokenInitialized = false;
     UpgradeManagerDatanode upgradeManager = null;
+    private final DataNode dn;
 
-    BPOfferService(InetSocketAddress isa) {
-      this.bpRegistration = new DatanodeRegistration(getMachineName());
-      bpRegistration.setInfoPort(infoServer.getPort());
-      bpRegistration.setIpcPort(getIpcPort());
-      this.nnAddr = isa;
+    BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
+      this.dn = dn;
+      this.bpRegistration = dn.createRegistration();
+      this.nnAddr = nnAddr;
     }
 
     /**
@@ -822,7 +825,6 @@ public class DataNode extends Configured
     void setNamespaceInfo(NamespaceInfo nsinfo) {
       bpNSInfo = nsinfo;
       this.blockPoolId = nsinfo.getBlockPoolID();
-      blockPoolManager.addBlockPool(this);
     }
 
     void setNameNode(DatanodeProtocol dnProtocol) {
@@ -831,7 +833,7 @@ public class DataNode extends Configured
 
     private NamespaceInfo handshake() throws IOException {
       NamespaceInfo nsInfo = new NamespaceInfo();
-      while (shouldRun && shouldServiceRun) {
+      while (dn.shouldRun && shouldServiceRun) {
         try {
           nsInfo = bpNamenode.versionRequest();
           // verify build version
@@ -867,7 +869,7 @@ public class DataNode extends Configured
       return nsInfo;
     }
 
-    void setupBP(Configuration conf, AbstractList<File> dataDirs) 
+    void setupBP(Configuration conf) 
     throws IOException {
       // get NN proxy
       DatanodeProtocol dnp = 
@@ -878,52 +880,19 @@ public class DataNode extends Configured
       // handshake with NN
       NamespaceInfo nsInfo = handshake();
       setNamespaceInfo(nsInfo);
-      synchronized(DataNode.this) {
-        // we do not allow namenode from different cluster to register
-        if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
-          throw new IOException(
-              "cannot register with the namenode because clusterid do not 
match:"
-              + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + 
nsInfo.clusterID + 
-              ";dn cid=" + clusterId);
-        }
-
-        setupBPStorage();
-
-        setClusterId(nsInfo.clusterID);
-      }
-    
-      initPeriodicScanners(conf);
-    }
-    
-    void setupBPStorage() throws IOException {
-      StartupOption startOpt = getStartupOption(conf);
-      assert startOpt != null : "Startup option must be set.";
-
-      boolean simulatedFSDataset = conf.getBoolean(
-          DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
-          DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
+      dn.initBlockPool(this, nsInfo);
       
-      if (simulatedFSDataset) {
-        initFsDataSet(conf, dataDirs);
-        bpRegistration.setStorageID(getStorageId()); //same as DN
+      bpRegistration.setStorageID(dn.getStorageId());
+      StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId);
+      if (storageInfo == null) {
+        // it's null in the case of SimulatedDataSet
         bpRegistration.storageInfo.layoutVersion = 
HdfsConstants.LAYOUT_VERSION;
-        bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
-        bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
+        bpRegistration.setStorageInfo(nsInfo);
       } else {
-        // read storage info, lock data dirs and transition fs state if 
necessary          
-        storage.recoverTransitionRead(DataNode.this, blockPoolId, bpNSInfo,
-            dataDirs, startOpt);
-        LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
-            + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
-            + bpNSInfo);
-
-        bpRegistration.setStorageID(getStorageId());
-        bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
-        initFsDataSet(conf, dataDirs);
+        bpRegistration.setStorageInfo(storageInfo);
       }
-      data.addBlockPool(blockPoolId, conf);
     }
-
+    
     /**
      * This methods  arranges for the data node to send the block report at 
      * the next heartbeat.
@@ -931,9 +900,9 @@ public class DataNode extends Configured
     void scheduleBlockReport(long delay) {
       if (delay > 0) { // send BR after random delay
         lastBlockReport = System.currentTimeMillis()
-        - ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
+        - ( dn.blockReportInterval - 
DFSUtil.getRandom().nextInt((int)(delay)));
       } else { // send at next heartbeat
-        lastBlockReport = lastHeartbeat - blockReportInterval;
+        lastBlockReport = lastHeartbeat - dn.blockReportInterval;
       }
       resetBlockReportTime = true; // reset future BRs for randomness
     }
@@ -1038,11 +1007,11 @@ public class DataNode extends Configured
       // send block report if timer has expired.
       DatanodeCommand cmd = null;
       long startTime = now();
-      if (startTime - lastBlockReport > blockReportInterval) {
+      if (startTime - lastBlockReport > dn.blockReportInterval) {
 
         // Create block report
         long brCreateStartTime = now();
-        BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
+        BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId);
 
         // Send block report
         long brSendStartTime = now();
@@ -1052,7 +1021,7 @@ public class DataNode extends Configured
         // Log the block report processing stats from Datanode perspective
         long brSendCost = now() - brSendStartTime;
         long brCreateCost = brSendStartTime - brCreateStartTime;
-        metrics.addBlockReport(brSendCost);
+        dn.metrics.addBlockReport(brSendCost);
         LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
             + " blocks took " + brCreateCost + " msec to generate and "
             + brSendCost + " msecs for RPC and NN processing");
@@ -1060,7 +1029,7 @@ public class DataNode extends Configured
         // If we have sent the first block report, then wait a random
         // time before we start the periodic block reports.
         if (resetBlockReportTime) {
-          lastBlockReport = startTime - 
DFSUtil.getRandom().nextInt((int)(blockReportInterval));
+          lastBlockReport = startTime - 
DFSUtil.getRandom().nextInt((int)(dn.blockReportInterval));
           resetBlockReportTime = false;
         } else {
           /* say the last block report was at 8:20:14. The current report
@@ -1070,7 +1039,7 @@ public class DataNode extends Configured
            *   2) unexpected like 11:35:43, next report should be at 12:20:14
            */
           lastBlockReport += (now() - lastBlockReport) /
-          blockReportInterval * blockReportInterval;
+          dn.blockReportInterval * dn.blockReportInterval;
         }
         LOG.info("sent block report, processed command:" + cmd);
       }
@@ -1080,12 +1049,12 @@ public class DataNode extends Configured
     
     DatanodeCommand [] sendHeartBeat() throws IOException {
       return bpNamenode.sendHeartbeat(bpRegistration,
-          data.getCapacity(),
-          data.getDfsUsed(),
-          data.getRemaining(),
-          data.getBlockPoolUsed(blockPoolId),
-          xmitsInProgress.get(),
-          getXceiverCount(), data.getNumFailedVolumes());
+          dn.data.getCapacity(),
+          dn.data.getDfsUsed(),
+          dn.data.getRemaining(),
+          dn.data.getBlockPoolUsed(blockPoolId),
+          dn.xmitsInProgress.get(),
+          dn.getXceiverCount(), dn.data.getNumFailedVolumes());
     }
     
     //This must be called only by blockPoolManager
@@ -1121,21 +1090,9 @@ public class DataNode extends Configured
       
       if(upgradeManager != null)
         upgradeManager.shutdownUpgrade();
-      
-      blockPoolManager.remove(this);
       shouldServiceRun = false;
       RPC.stopProxy(bpNamenode);
-      if (blockScanner != null) {
-        blockScanner.removeBlockPool(this.getBlockPoolId());
-      }
-    
-      if (data != null) { 
-        data.shutdownBlockPool(this.getBlockPoolId());
-      }
-
-      if (storage != null) {
-        storage.removeBlockPoolStorage(this.getBlockPoolId());
-      }
+      dn.shutdownBlockPool(this);
     }
 
     /**
@@ -1144,22 +1101,22 @@ public class DataNode extends Configured
      */
     private void offerService() throws Exception {
       LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
-          + deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
-          + blockReportInterval + "msec" + " Initial delay: "
-          + initialBlockReportDelay + "msec" + "; heartBeatInterval="
-          + heartBeatInterval);
+          + dn.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+          + dn.blockReportInterval + "msec" + " Initial delay: "
+          + dn.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+          + dn.heartBeatInterval);
 
       //
       // Now loop for a long time....
       //
-      while (shouldRun && shouldServiceRun) {
+      while (dn.shouldRun && shouldServiceRun) {
         try {
           long startTime = now();
 
           //
           // Every so often, send heartbeat or block-report
           //
-          if (startTime - lastHeartbeat > heartBeatInterval) {
+          if (startTime - lastHeartbeat > dn.heartBeatInterval) {
             //
             // All heartbeat messages include following info:
             // -- Datanode name
@@ -1168,9 +1125,9 @@ public class DataNode extends Configured
             // -- Bytes remaining
             //
             lastHeartbeat = startTime;
-            if (!heartbeatsDisabledForTests) {
+            if (!dn.heartbeatsDisabledForTests) {
               DatanodeCommand[] cmds = sendHeartBeat();
-              metrics.addHeartbeat(now() - startTime);
+              dn.metrics.addHeartbeat(now() - startTime);
 
               long startProcessCommands = now();
               if (!processCommand(cmds))
@@ -1183,7 +1140,7 @@ public class DataNode extends Configured
             }
           }
           if (pendingReceivedRequests > 0
-              || (startTime - lastDeletedReport > deleteReportInterval)) {
+              || (startTime - lastDeletedReport > dn.deleteReportInterval)) {
             reportReceivedDeletedBlocks();
             lastDeletedReport = startTime;
           }
@@ -1192,15 +1149,15 @@ public class DataNode extends Configured
           processCommand(cmd);
 
           // Now safe to start scanning the block pool
-          if (blockScanner != null) {
-            blockScanner.addBlockPool(this.blockPoolId);
+          if (dn.blockScanner != null) {
+            dn.blockScanner.addBlockPool(this.blockPoolId);
           }
 
           //
           // There is no work to do;  sleep until hearbeat timer elapses, 
           // or work arrives, and then iterate again.
           //
-          long waitTime = heartBeatInterval - 
+          long waitTime = dn.heartBeatInterval - 
           (System.currentTimeMillis() - lastHeartbeat);
           synchronized(receivedAndDeletedBlockList) {
             if (waitTime > 0 && pendingReceivedRequests == 0) {
@@ -1223,7 +1180,7 @@ public class DataNode extends Configured
           }
           LOG.warn("RemoteException in offerService", re);
           try {
-            long sleepTime = Math.min(1000, heartBeatInterval);
+            long sleepTime = Math.min(1000, dn.heartBeatInterval);
             Thread.sleep(sleepTime);
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
@@ -1269,7 +1226,7 @@ public class DataNode extends Configured
           (bpNSInfo.getLayoutVersion(), "namenode");
       }
 
-      while(shouldRun && shouldServiceRun) {
+      while(dn.shouldRun && shouldServiceRun) {
         try {
           // Use returned registration from namenode with updated machine name.
           bpRegistration = bpNamenode.registerDatanode(bpRegistration);
@@ -1277,8 +1234,6 @@ public class DataNode extends Configured
           LOG.info("bpReg after =" + bpRegistration.storageInfo + 
               ";sid=" + bpRegistration.storageID + 
";name="+bpRegistration.getName());
 
-          NetUtils.getHostname();
-          hostName = bpRegistration.getHost();
           break;
         } catch(SocketTimeoutException e) {  // namenode is busy
           LOG.info("Problem connecting to server: " + nnAddr);
@@ -1287,47 +1242,13 @@ public class DataNode extends Configured
           } catch (InterruptedException ie) {}
         }
       }
-
-      if (storage.getStorageID().equals("")) {
-        storage.setStorageID(bpRegistration.getStorageID());
-        storage.writeAll();
-        LOG.info("New storage id " + bpRegistration.getStorageID()
-            + " is assigned to data-node " + bpRegistration.getName());
-      } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) 
{
-        throw new IOException("Inconsistent storage IDs. Name-node returned "
-            + bpRegistration.getStorageID() 
-            + ". Expecting " + storage.getStorageID());
-      }
-
-      if (!isBlockTokenInitialized) {
-        /* first time registering with NN */
-        ExportedBlockKeys keys = bpRegistration.exportedKeys;
-        isBlockTokenEnabled = keys.isBlockTokenEnabled();
-        if (isBlockTokenEnabled) {
-          long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-          long blockTokenLifetime = keys.getTokenLifetime();
-          LOG.info("Block token params received from NN: for block pool " +
-              blockPoolId + " keyUpdateInterval="
-              + blockKeyUpdateInterval / (60 * 1000)
-              + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
-              + " min(s)");
-          final BlockTokenSecretManager secretMgr = 
-            new BlockTokenSecretManager(false, 0, blockTokenLifetime);
-          blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
-        }
-        isBlockTokenInitialized = true;
-      }
-
-      if (isBlockTokenEnabled) {
-        blockPoolTokenSecretManager.setKeys(blockPoolId,
-            bpRegistration.exportedKeys);
-        bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
-      }
+      
+      dn.bpRegistrationSucceeded(bpRegistration, blockPoolId);
 
       LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
 
       // random short delay - helps scatter the BR from all DNs
-      scheduleBlockReport(initialBlockReportDelay);
+      scheduleBlockReport(dn.initialBlockReportDelay);
     }
 
 
@@ -1341,14 +1262,14 @@ public class DataNode extends Configured
      */
     @Override
     public void run() {
-      LOG.info(bpRegistration + "In BPOfferService.run, data = " + data
+      LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data
           + ";bp=" + blockPoolId);
 
       try {
         // init stuff
         try {
           // setup storage
-          setupBP(conf, dataDirs);
+          setupBP(dn.conf);
           register();
         } catch (IOException ioe) {
           // Initial handshake, storage recovery or registration failed
@@ -1360,13 +1281,13 @@ public class DataNode extends Configured
 
         initialized = true; // bp is initialized;
         
-        while (shouldRun && shouldServiceRun) {
+        while (dn.shouldRun && shouldServiceRun) {
           try {
             startDistributedUpgradeIfNeeded();
             offerService();
           } catch (Exception ex) {
             LOG.error("Exception in BPOfferService", ex);
-            if (shouldRun && shouldServiceRun) {
+            if (dn.shouldRun && shouldServiceRun) {
               try {
                 Thread.sleep(5000);
               } catch (InterruptedException ie) {
@@ -1379,7 +1300,7 @@ public class DataNode extends Configured
         LOG.warn("Unexpected exception", ex);
       } finally {
         LOG.warn(bpRegistration + " ending block pool service for: " 
-            + blockPoolId);
+            + blockPoolId + " thread " + Thread.currentThread().getId());
         cleanUp();
       }
     }
@@ -1420,8 +1341,8 @@ public class DataNode extends Configured
       switch(cmd.getAction()) {
       case DatanodeProtocol.DNA_TRANSFER:
         // Send a copy of a block to another datanode
-        transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), 
bcmd.getTargets());
-        metrics.incrBlocksReplicated(bcmd.getBlocks().length);
+        dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), 
bcmd.getTargets());
+        dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
         break;
       case DatanodeProtocol.DNA_INVALIDATE:
         //
@@ -1430,16 +1351,16 @@ public class DataNode extends Configured
         //
         Block toDelete[] = bcmd.getBlocks();
         try {
-          if (blockScanner != null) {
-            blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
+          if (dn.blockScanner != null) {
+            dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
           }
           // using global fsdataset
-          data.invalidate(bcmd.getBlockPoolId(), toDelete);
+          dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
         } catch(IOException e) {
-          checkDiskError();
+          dn.checkDiskError();
           throw e;
         }
-        metrics.incrBlocksRemoved(toDelete.length);
+        dn.metrics.incrBlocksRemoved(toDelete.length);
         break;
       case DatanodeProtocol.DNA_SHUTDOWN:
         // shut down the data node
@@ -1448,12 +1369,12 @@ public class DataNode extends Configured
       case DatanodeProtocol.DNA_REGISTER:
         // namenode requested a registration - at start or if NN lost contact
         LOG.info("DatanodeCommand action: DNA_REGISTER");
-        if (shouldRun && shouldServiceRun) {
+        if (dn.shouldRun && shouldServiceRun) {
           register();
         }
         break;
       case DatanodeProtocol.DNA_FINALIZE:
-        storage.finalizeUpgrade(((FinalizeCommand) cmd)
+        dn.storage.finalizeUpgrade(((FinalizeCommand) cmd)
             .getBlockPoolId());
         break;
       case UpgradeCommand.UC_ACTION_START_UPGRADE:
@@ -1461,12 +1382,12 @@ public class DataNode extends Configured
         processDistributedUpgradeCommand((UpgradeCommand)cmd);
         break;
       case DatanodeProtocol.DNA_RECOVERBLOCK:
-        recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+        dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
         break;
       case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
         LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-        if (isBlockTokenEnabled) {
-          blockPoolTokenSecretManager.setKeys(blockPoolId, 
+        if (dn.isBlockTokenEnabled) {
+          dn.blockPoolTokenSecretManager.setKeys(blockPoolId, 
               ((KeyUpdateCommand) cmd).getExportedKeys());
         }
         break;
@@ -1476,7 +1397,7 @@ public class DataNode extends Configured
                    ((BalancerBandwidthCommand) 
cmd).getBalancerBandwidthValue();
         if (bandwidth > 0) {
           DataXceiverServer dxcs =
-                       (DataXceiverServer) dataXceiverServer.getRunnable();
+                       (DataXceiverServer) dn.dataXceiverServer.getRunnable();
           dxcs.balanceThrottler.setBandwidth(bandwidth);
         }
         break;
@@ -1495,7 +1416,7 @@ public class DataNode extends Configured
     synchronized UpgradeManagerDatanode getUpgradeManager() {
       if(upgradeManager == null)
         upgradeManager = 
-          new UpgradeManagerDatanode(DataNode.this, blockPoolId);
+          new UpgradeManagerDatanode(dn, blockPoolId);
       
       return upgradeManager;
     }
@@ -1555,6 +1476,133 @@ public class DataNode extends Configured
     blockPoolManager = new BlockPoolManager(conf);
   }
   
+  /**
+   * Check that the registration returned from a NameNode is consistent
+   * with the information in the storage. If the storage is fresh/unformatted,
+   * sets the storage ID based on this registration.
+   * Also updates the block pool's state in the secret manager.
+   */
+  private synchronized void bpRegistrationSucceeded(DatanodeRegistration 
bpRegistration,
+      String blockPoolId)
+      throws IOException {
+    hostName = bpRegistration.getHost();
+
+    if (storage.getStorageID().equals("")) {
+      // This is a fresh datanode -- take the storage ID provided by the
+      // NN and persist it.
+      storage.setStorageID(bpRegistration.getStorageID());
+      storage.writeAll();
+      LOG.info("New storage id " + bpRegistration.getStorageID()
+          + " is assigned to data-node " + bpRegistration.getName());
+    } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
+      throw new IOException("Inconsistent storage IDs. Name-node returned "
+          + bpRegistration.getStorageID() 
+          + ". Expecting " + storage.getStorageID());
+    }
+    
+    registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
+  }
+  
+  /**
+   * After the block pool has contacted the NN, registers that block pool
+   * with the secret manager, updating it with the secrets provided by the NN.
+   * @param bpRegistration
+   * @param blockPoolId
+   * @throws IOException
+   */
+  private void registerBlockPoolWithSecretManager(DatanodeRegistration 
bpRegistration,
+      String blockPoolId) throws IOException {
+    ExportedBlockKeys keys = bpRegistration.exportedKeys;
+    isBlockTokenEnabled = keys.isBlockTokenEnabled();
+    // TODO should we check that all federated nns are either enabled or
+    // disabled?
+    if (!isBlockTokenEnabled) return;
+    
+    if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
+      long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+      long blockTokenLifetime = keys.getTokenLifetime();
+      LOG.info("Block token params received from NN: for block pool " +
+          blockPoolId + " keyUpdateInterval="
+          + blockKeyUpdateInterval / (60 * 1000)
+          + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+          + " min(s)");
+      final BlockTokenSecretManager secretMgr = 
+        new BlockTokenSecretManager(false, 0, blockTokenLifetime);
+      blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
+    }
+    
+    blockPoolTokenSecretManager.setKeys(blockPoolId,
+        bpRegistration.exportedKeys);
+    bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
+  }
+
+  /**
+   * Remove the given block pool from the block scanner, dataset, and storage.
+   */
+  private void shutdownBlockPool(BPOfferService bpos) {
+    blockPoolManager.remove(bpos);
+
+    String bpId = bpos.getBlockPoolId();
+    if (blockScanner != null) {
+      blockScanner.removeBlockPool(bpId);
+    }
+  
+    if (data != null) { 
+      data.shutdownBlockPool(bpId);
+    }
+
+    if (storage != null) {
+      storage.removeBlockPoolStorage(bpId);
+    }
+  }
+
+  void initBlockPool(BPOfferService bpOfferService,
+      NamespaceInfo nsInfo) throws IOException {
+    String blockPoolId = nsInfo.getBlockPoolID();
+
+    blockPoolManager.addBlockPool(bpOfferService);
+
+    synchronized (this) {
+      // we do not allow namenode from different cluster to register
+      if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
+        throw new IOException(
+            "cannot register with the namenode because clusterid do not match:"
+            + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + 
nsInfo.clusterID + 
+            ";dn cid=" + clusterId);
+      }
+
+      setClusterId(nsInfo.clusterID);
+    }
+    
+    StartupOption startOpt = getStartupOption(conf);
+    assert startOpt != null : "Startup option must be set.";
+
+    boolean simulatedFSDataset = conf.getBoolean(
+        DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
+        DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
+    
+    if (!simulatedFSDataset) {
+      // read storage info, lock data dirs and transition fs state if 
necessary          
+      storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
+          dataDirs, startOpt);
+      StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
+      LOG.info("setting up storage: nsid=" +
+          bpStorage.getNamespaceID() + ";bpid="
+          + blockPoolId + ";lv=" + storage.getLayoutVersion() +
+          ";nsInfo=" + nsInfo);
+    }
+    initFsDataSet();
+    initPeriodicScanners(conf);    
+    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+  }
+
+  private DatanodeRegistration createRegistration() {
+    DatanodeRegistration reg = new DatanodeRegistration(getMachineName());
+    reg.setInfoPort(infoServer.getPort());
+    reg.setIpcPort(getIpcPort());
+    return reg;
+  }
+
   BPOfferService[] getAllBpOs() {
     return blockPoolManager.getAllNamenodeThreads();
   }
@@ -1567,8 +1615,7 @@ public class DataNode extends Configured
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
    */
-  private synchronized void initFsDataSet(Configuration conf,
-      AbstractList<File> dataDirs) throws IOException {
+  private synchronized void initFsDataSet() throws IOException {
     if (data != null) { // Already initialized
       return;
     }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java?rev=1203444&r1=1203443&r2=1203444&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
 Fri Nov 18 00:45:31 2011
@@ -61,7 +61,7 @@ public class DataNodeTestUtils {
       bpos.setNamespaceInfo(nsifno);
 
       dn.setBPNamenode(bpid, nn);
-      bpos.setupBPStorage();
+      dn.initBlockPool(bpos, nsifno);
     }
   }
 }


Reply via email to