Author: todd
Date: Thu Dec  1 21:26:08 2011
New Revision: 1209249

URL: http://svn.apache.org/viewvc?rev=1209249&view=rev
Log:
HDFS-2612. Handle refreshNameNodes in federated HA clusters. Contributed by 
Todd Lipcon.

Added:
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
Modified:
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1209249&r1=1209248&r2=1209249&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
 Thu Dec  1 21:26:08 2011
@@ -35,3 +35,5 @@ HDFS-1971. Send block report from datano
 HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return 
HeartbeatResponse. (suresh)
 
 HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd)
+
+HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd)

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1209249&r1=1209248&r2=1209249&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 Thu Dec  1 21:26:08 2011
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +44,8 @@ import org.apache.hadoop.ipc.RPC;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * One instance per block-pool/namespace on the DN, which handles the
@@ -89,6 +93,21 @@ class BPOfferService {
     this.bpServiceToActive = this.bpServices.get(0);
   }
 
+  void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
+    Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
+    for (BPServiceActor actor : bpServices) {
+      oldAddrs.add(actor.getNNSocketAddress());
+    }
+    Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs);
+    
+    if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) {
+      // Keep things simple for now -- we can implement this at a later date.
+      throw new IOException(
+          "HA does not currently support adding a new standby to a running DN. 
" +
+          "Please do a rolling restart of DNs to reconfigure the list of 
NNs.");
+    }
+  }
+
   /**
    * returns true if BP thread has completed initialization of storage
    * and has registered with the corresponding namenode

Added: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1209249&view=auto
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
 (added)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
 Thu Dec  1 21:26:08 2011
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Manages the BPOfferService objects for the data node.
+ * Creation, removal, starting, stopping, shutdown on BPOfferService
+ * objects must be done via APIs in this class.
+ */
+@InterfaceAudience.Private
+class BlockPoolManager {
+  private static final Log LOG = DataNode.LOG;
+  
+  private final Map<String, BPOfferService> bpByNameserviceId =
+    Maps.newHashMap();
+  private final Map<String, BPOfferService> bpByBlockPoolId =
+    Maps.newHashMap();
+  private final List<BPOfferService> offerServices =
+    Lists.newArrayList();
+
+  private final DataNode dn;
+
+  //This lock is used only to ensure exclusion of refreshNamenodes
+  private final Object refreshNamenodesLock = new Object();
+  
+  BlockPoolManager(DataNode dn) {
+    this.dn = dn;
+  }
+  
+  synchronized void addBlockPool(BPOfferService bpos) {
+    Preconditions.checkArgument(offerServices.contains(bpos),
+        "Unknown BPOS: %s", bpos);
+    if (bpos.getBlockPoolId() == null) {
+      throw new IllegalArgumentException("Null blockpool id");
+    }
+    bpByBlockPoolId.put(bpos.getBlockPoolId(), bpos);
+  }
+  
+  /**
+   * Returns the array of BPOfferService objects. 
+   * Caution: The BPOfferService returned could be shutdown any time.
+   */
+  synchronized BPOfferService[] getAllNamenodeThreads() {
+    BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
+    return offerServices.toArray(bposArray);
+  }
+      
+  synchronized BPOfferService get(String bpid) {
+    return bpByBlockPoolId.get(bpid);
+  }
+  
+  // TODO(HA) would be good to kill this
+  synchronized BPOfferService get(InetSocketAddress addr) {
+    for (BPOfferService bpos : offerServices) {
+      if (bpos.containsNN(addr)) {
+        return bpos;
+      }
+    }
+    return null;
+  }
+
+  synchronized void remove(BPOfferService t) {
+    offerServices.remove(t);
+    bpByBlockPoolId.remove(t.getBlockPoolId());
+    
+    boolean removed = false;
+    for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();
+         it.hasNext() && !removed;) {
+      BPOfferService bpos = it.next();
+      if (bpos == t) {
+        it.remove();
+        LOG.info("Removed " + bpos);
+        removed = true;
+      }
+    }
+    
+    if (!removed) {
+      LOG.warn("Couldn't remove BPOS " + t + " from bpByNameserviceId map");
+    }
+  }
+  
+  void shutDownAll() throws InterruptedException {
+    BPOfferService[] bposArray = this.getAllNamenodeThreads();
+    
+    for (BPOfferService bpos : bposArray) {
+      bpos.stop(); //interrupts the threads
+    }
+    //now join
+    for (BPOfferService bpos : bposArray) {
+      bpos.join();
+    }
+  }
+  
+  synchronized void startAll() throws IOException {
+    try {
+      UserGroupInformation.getLoginUser().doAs(
+          new PrivilegedExceptionAction<Object>() {
+            public Object run() throws Exception {
+              for (BPOfferService bpos : offerServices) {
+                bpos.start();
+              }
+              return null;
+            }
+          });
+    } catch (InterruptedException ex) {
+      IOException ioe = new IOException();
+      ioe.initCause(ex.getCause());
+      throw ioe;
+    }
+  }
+  
+  void joinAll() {
+    for (BPOfferService bpos: this.getAllNamenodeThreads()) {
+      bpos.join();
+    }
+  }
+  
+  void refreshNamenodes(Configuration conf)
+      throws IOException {
+    LOG.info("Refresh request received for nameservices: "
+        + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES));
+    
+    Map<String, Map<String, InetSocketAddress>> newAddressMap = 
+      DFSUtil.getNNServiceRpcAddresses(conf);
+    
+    synchronized (refreshNamenodesLock) {
+      doRefreshNamenodes(newAddressMap);
+    }
+  }
+  
+  private void doRefreshNamenodes(
+      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
+    assert Thread.holdsLock(refreshNamenodesLock);
+
+    Set<String> toRefresh = Sets.newHashSet();
+    Set<String> toAdd = Sets.newHashSet();
+    Set<String> toRemove;
+    
+    synchronized (this) {
+      // Step 1. For each of the new nameservices, figure out whether
+      // it's an update of the set of NNs for an existing NS,
+      // or an entirely new nameservice.
+      for (String nameserviceId : addrMap.keySet()) {
+        if (bpByNameserviceId.containsKey(nameserviceId)) {
+          toRefresh.add(nameserviceId);
+        } else {
+          toAdd.add(nameserviceId);
+        }
+      }
+      
+      // Step 2. Any nameservices we currently have but are no longer present
+      // need to be removed.
+      toRemove = Sets.newHashSet(Sets.difference(
+          bpByNameserviceId.keySet(), addrMap.keySet()));
+      
+      assert toRefresh.size() + toAdd.size() ==
+        addrMap.size() :
+          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
+          "  toRemove: " + 
Joiner.on(",").useForNull("<default>").join(toRemove) +
+          "  toRefresh: " + 
Joiner.on(",").useForNull("<default>").join(toRefresh);
+
+      
+      // Step 3. Start new nameservices
+      if (!toAdd.isEmpty()) {
+        LOG.info("Starting BPOfferServices for nameservices: " +
+            Joiner.on(",").useForNull("<default>").join(toAdd));
+      
+        for (String nsToAdd : toAdd) {
+          ArrayList<InetSocketAddress> addrs =
+            Lists.newArrayList(addrMap.get(nsToAdd).values());
+          BPOfferService bpos = createBPOS(addrs);
+          bpByNameserviceId.put(nsToAdd, bpos);
+          offerServices.add(bpos);
+        }
+      }
+      startAll();
+    }
+
+    // Step 4. Shut down old nameservices. This happens outside
+    // of the synchronized(this) lock since they need to call
+    // back to .remove() from another thread
+    if (!toRemove.isEmpty()) {
+      LOG.info("Stopping BPOfferServices for nameservices: " +
+          Joiner.on(",").useForNull("<default>").join(toRemove));
+      
+      for (String nsToRemove : toRemove) {
+        BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
+        bpos.stop();
+        bpos.join();
+        // they will call remove on their own
+      }
+    }
+    
+    // Step 5. Update nameservices whose NN list has changed
+    if (!toRefresh.isEmpty()) {
+      LOG.info("Refreshing list of NNs for nameservices: " +
+          Joiner.on(",").useForNull("<default>").join(toRefresh));
+      
+      for (String nsToRefresh : toRefresh) {
+        BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
+        ArrayList<InetSocketAddress> addrs =
+          Lists.newArrayList(addrMap.get(nsToRefresh).values());
+        bpos.refreshNNList(addrs);
+      }
+    }
+  }
+
+  /**
+   * Extracted out for test purposes.
+   */
+  protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
+    return new BPOfferService(nnAddrs, dn);
+  }
+}
\ No newline at end of file

Modified: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1209249&r1=1209248&r2=1209249&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 (original)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 Thu Dec  1 21:26:08 2011
@@ -48,7 +48,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
 
 import java.io.BufferedOutputStream;
@@ -71,12 +70,10 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -91,7 +88,6 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -169,7 +165,6 @@ import org.apache.hadoop.util.VersionInf
 import org.mortbay.util.ajax.JSON;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 
@@ -236,163 +231,6 @@ public class DataNode extends Configured
     return NetUtils.createSocketAddr(target);
   }
   
-  /**
-   * Manages he BPOfferService objects for the data node.
-   * Creation, removal, starting, stopping, shutdown on BPOfferService
-   * objects must be done via APIs in this class.
-   */
-  @InterfaceAudience.Private
-  class BlockPoolManager {
-    private final Map<String, BPOfferService> bpMapping;
-    private final List<BPOfferService> offerServices;
- 
-    //This lock is used only to ensure exclusion of refreshNamenodes
-    private final Object refreshNamenodesLock = new Object();
-    
-    BlockPoolManager(Configuration conf)
-        throws IOException {
-      bpMapping = new HashMap<String, BPOfferService>();
-      offerServices = new ArrayList<BPOfferService>();
-  
-      Map<String, Map<String, InetSocketAddress>> map =
-        DFSUtil.getNNServiceRpcAddresses(conf);
-      for (Entry<String, Map<String, InetSocketAddress>> entry :
-           map.entrySet()) {
-        List<InetSocketAddress> nnList = 
Lists.newArrayList(entry.getValue().values());
-        BPOfferService bpos = new BPOfferService(nnList, DataNode.this);
-        offerServices.add(bpos);
-      }
-    }
-    
-    synchronized void addBlockPool(BPOfferService bpos) {
-      Preconditions.checkArgument(offerServices.contains(bpos),
-          "Unknown BPOS: %s", bpos);
-      if (bpos.getBlockPoolId() == null) {
-        throw new IllegalArgumentException("Null blockpool id");
-      }
-      LOG.info("===> registering in bpmapping: " + bpos);
-      bpMapping.put(bpos.getBlockPoolId(), bpos);
-    }
-    
-    /**
-     * Returns the array of BPOfferService objects. 
-     * Caution: The BPOfferService returned could be shutdown any time.
-     */
-    synchronized BPOfferService[] getAllNamenodeThreads() {
-      BPOfferService[] bposArray = new BPOfferService[offerServices.size()];
-      return offerServices.toArray(bposArray);
-    }
-        
-    synchronized BPOfferService get(String bpid) {
-      return bpMapping.get(bpid);
-    }
-    
-    // TODO(HA) would be good to kill this
-    synchronized BPOfferService get(InetSocketAddress addr) {
-      for (BPOfferService bpos : offerServices) {
-        if (bpos.containsNN(addr)) {
-          return bpos;
-        }
-      }
-      return null;
-    }
-
-    synchronized void remove(BPOfferService t) {
-      offerServices.remove(t);
-      bpMapping.remove(t.getBlockPoolId());
-    }
-    
-    void shutDownAll() throws InterruptedException {
-      BPOfferService[] bposArray = this.getAllNamenodeThreads();
-      
-      for (BPOfferService bpos : bposArray) {
-        bpos.stop(); //interrupts the threads
-      }
-      //now join
-      for (BPOfferService bpos : bposArray) {
-        bpos.join();
-      }
-    }
-    
-    synchronized void startAll() throws IOException {
-      try {
-        UserGroupInformation.getLoginUser().doAs(
-            new PrivilegedExceptionAction<Object>() {
-              public Object run() throws Exception {
-                for (BPOfferService bpos : offerServices) {
-                  bpos.start();
-                }
-                return null;
-              }
-            });
-      } catch (InterruptedException ex) {
-        IOException ioe = new IOException();
-        ioe.initCause(ex.getCause());
-        throw ioe;
-      }
-    }
-    
-    void joinAll() {
-      for (BPOfferService bpos: this.getAllNamenodeThreads()) {
-        bpos.join();
-      }
-    }
-    
-    void refreshNamenodes(Configuration conf)
-        throws IOException {
-      throw new UnsupportedOperationException("TODO(HA)");
-/*
- * TODO(HA)
-
-      LOG.info("Refresh request received for nameservices: "
-          + conf.get(DFS_FEDERATION_NAMESERVICES));
-      
-      // TODO(HA): need to update this for multiple NNs per nameservice
-      // For now, just list all of the NNs into this set
-      Map<String, Map<String, InetSocketAddress>> newAddressMap = 
-        DFSUtil.getNNServiceRpcAddresses(conf);
-      Set<InetSocketAddress> newAddresses = Sets.newHashSet();
-      for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) 
{
-        newAddresses.add(cnn.getAddress());
-      }
-      
-      List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
-      List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
-      synchronized (refreshNamenodesLock) {
-        synchronized (this) {
-          for (InetSocketAddress nnaddr : offerServices.keySet()) {
-            if (!(newAddresses.contains(nnaddr))) {
-              toShutdown.add(offerServices.get(nnaddr));
-            }
-          }
-          for (InetSocketAddress nnaddr : newAddresses) {
-            if (!(offerServices.containsKey(nnaddr))) {
-              toStart.add(nnaddr);
-            }
-          }
-
-          for (InetSocketAddress nnaddr : toStart) {
-            BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this);
-            offerServices.put(bpos.getNNSocketAddress(), bpos);
-          }
-        }
-
-        for (BPOfferService bpos : toShutdown) {
-          bpos.stop();
-          bpos.join();
-        }
-        
-        // stoping the BPOSes causes them to call remove() on their own when 
they
-        // clean up.
-        
-        // Now start the threads that are not already running.
-        startAll();
-      }
-      */
-    }
-
-  }
-  
   volatile boolean shouldRun = true;
   private BlockPoolManager blockPoolManager;
   public volatile FSDatasetInterface data = null;
@@ -779,7 +617,8 @@ public class DataNode extends Configured
 
     metrics = DataNodeMetrics.create(conf, getMachineName());
 
-    blockPoolManager = new BlockPoolManager(conf);
+    blockPoolManager = new BlockPoolManager(this);
+    blockPoolManager.refreshNamenodes(conf);
   }
   
   /**

Added: 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java?rev=1209249&view=auto
==============================================================================
--- 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
 (added)
+++ 
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
 Thu Dec  1 21:26:08 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestBlockPoolManager {
+  private Log LOG = LogFactory.getLog(TestBlockPoolManager.class);
+  private DataNode mockDN = Mockito.mock(DataNode.class);
+  private BlockPoolManager bpm;
+  private StringBuilder log = new StringBuilder();
+  private int mockIdx = 1;
+  
+  @Before
+  public void setupBPM() {
+    bpm = new BlockPoolManager(mockDN){
+
+      @Override
+      protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
+        final int idx = mockIdx++;
+        doLog("create #" + idx);
+        final BPOfferService bpos = Mockito.mock(BPOfferService.class);
+        Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString();
+        // Log refreshes
+        try {
+          Mockito.doAnswer(
+              new Answer<Void>() {
+                @Override
+                public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                  doLog("refresh #" + idx);
+                  return null;
+                }
+              }).when(bpos).refreshNNList(
+                  Mockito.<ArrayList<InetSocketAddress>>any());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        // Log stops
+        Mockito.doAnswer(
+            new Answer<Void>() {
+              @Override
+              public Void answer(InvocationOnMock invocation) throws Throwable 
{
+                doLog("stop #" + idx);
+                bpm.remove(bpos);
+                return null;
+              }
+            }).when(bpos).stop();
+        return bpos;
+      }
+    };
+  }
+  
+  private void doLog(String string) {
+    synchronized(log) {
+      LOG.info(string);
+      log.append(string).append("\n");
+    }
+  }
+
+  @Test
+  public void testSimpleSingleNS() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY,
+        "hdfs://mock1:8020");
+    bpm.refreshNamenodes(conf);
+    assertEquals("create #1\n", log.toString());
+  }
+
+  @Test
+  public void testFederationRefresh() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
+        "ns1,ns2");
+    addNN(conf, "ns1", "mock1:8020");
+    addNN(conf, "ns2", "mock1:8020");
+    bpm.refreshNamenodes(conf);
+    assertEquals(
+        "create #1\n" +
+        "create #2\n", log.toString());
+    log.setLength(0);
+
+    // Remove the first NS
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
+        "ns1");
+    bpm.refreshNamenodes(conf);
+    assertEquals(
+        "stop #1\n" +
+        "refresh #2\n", log.toString());
+    log.setLength(0);
+    
+    // Add back an NS -- this creates a new BPOS since the old
+    // one for ns2 should have been previously retired
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES,
+        "ns1,ns2");
+    bpm.refreshNamenodes(conf);
+    assertEquals(
+        "create #3\n" +
+        "refresh #2\n", log.toString());
+  }
+
+  private static void addNN(Configuration conf, String ns, String addr) {
+    String key = DFSUtil.addKeySuffixes(
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);
+    conf.set(key, addr);
+  }
+}


Reply via email to