Author: todd Date: Thu Dec 1 21:26:08 2011 New Revision: 1209249 URL: http://svn.apache.org/viewvc?rev=1209249&view=rev Log: HDFS-2612. Handle refreshNameNodes in federated HA clusters. Contributed by Todd Lipcon.
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1209249&r1=1209248&r2=1209249&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Thu Dec 1 21:26:08 2011 @@ -35,3 +35,5 @@ HDFS-1971. Send block report from datano HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse. (suresh) HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd) + +HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd) Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1209249&r1=1209248&r2=1209249&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Thu Dec 1 21:26:08 2011 @@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.da import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; @@ -42,6 +44,8 @@ import org.apache.hadoop.ipc.RPC; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * One instance per block-pool/namespace on the DN, which handles the @@ -89,6 +93,21 @@ class BPOfferService { this.bpServiceToActive = this.bpServices.get(0); } + void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException { + Set<InetSocketAddress> oldAddrs = Sets.newHashSet(); + for (BPServiceActor actor : bpServices) { + oldAddrs.add(actor.getNNSocketAddress()); + } + Set<InetSocketAddress> newAddrs = Sets.newHashSet(addrs); + + if (!Sets.symmetricDifference(oldAddrs, newAddrs).isEmpty()) { + // Keep things simple for now -- we can implement this at a later date. + throw new IOException( + "HA does not currently support adding a new standby to a running DN. " + + "Please do a rolling restart of DNs to reconfigure the list of NNs."); + } + } + /** * returns true if BP thread has completed initialization of storage * and has registered with the corresponding namenode Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1209249&view=auto ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (added) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Thu Dec 1 21:26:08 2011 @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Manages the BPOfferService objects for the data node. + * Creation, removal, starting, stopping, shutdown on BPOfferService + * objects must be done via APIs in this class. + */ +@InterfaceAudience.Private +class BlockPoolManager { + private static final Log LOG = DataNode.LOG; + + private final Map<String, BPOfferService> bpByNameserviceId = + Maps.newHashMap(); + private final Map<String, BPOfferService> bpByBlockPoolId = + Maps.newHashMap(); + private final List<BPOfferService> offerServices = + Lists.newArrayList(); + + private final DataNode dn; + + //This lock is used only to ensure exclusion of refreshNamenodes + private final Object refreshNamenodesLock = new Object(); + + BlockPoolManager(DataNode dn) { + this.dn = dn; + } + + synchronized void addBlockPool(BPOfferService bpos) { + Preconditions.checkArgument(offerServices.contains(bpos), + "Unknown BPOS: %s", bpos); + if (bpos.getBlockPoolId() == null) { + throw new IllegalArgumentException("Null blockpool id"); + } + bpByBlockPoolId.put(bpos.getBlockPoolId(), bpos); + } + + /** + * Returns the array of BPOfferService objects. + * Caution: The BPOfferService returned could be shutdown any time. + */ + synchronized BPOfferService[] getAllNamenodeThreads() { + BPOfferService[] bposArray = new BPOfferService[offerServices.size()]; + return offerServices.toArray(bposArray); + } + + synchronized BPOfferService get(String bpid) { + return bpByBlockPoolId.get(bpid); + } + + // TODO(HA) would be good to kill this + synchronized BPOfferService get(InetSocketAddress addr) { + for (BPOfferService bpos : offerServices) { + if (bpos.containsNN(addr)) { + return bpos; + } + } + return null; + } + + synchronized void remove(BPOfferService t) { + offerServices.remove(t); + bpByBlockPoolId.remove(t.getBlockPoolId()); + + boolean removed = false; + for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator(); + it.hasNext() && !removed;) { + BPOfferService bpos = it.next(); + if (bpos == t) { + it.remove(); + LOG.info("Removed " + bpos); + removed = true; + } + } + + if (!removed) { + LOG.warn("Couldn't remove BPOS " + t + " from bpByNameserviceId map"); + } + } + + void shutDownAll() throws InterruptedException { + BPOfferService[] bposArray = this.getAllNamenodeThreads(); + + for (BPOfferService bpos : bposArray) { + bpos.stop(); //interrupts the threads + } + //now join + for (BPOfferService bpos : bposArray) { + bpos.join(); + } + } + + synchronized void startAll() throws IOException { + try { + UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction<Object>() { + public Object run() throws Exception { + for (BPOfferService bpos : offerServices) { + bpos.start(); + } + return null; + } + }); + } catch (InterruptedException ex) { + IOException ioe = new IOException(); + ioe.initCause(ex.getCause()); + throw ioe; + } + } + + void joinAll() { + for (BPOfferService bpos: this.getAllNamenodeThreads()) { + bpos.join(); + } + } + + void refreshNamenodes(Configuration conf) + throws IOException { + LOG.info("Refresh request received for nameservices: " + + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES)); + + Map<String, Map<String, InetSocketAddress>> newAddressMap = + DFSUtil.getNNServiceRpcAddresses(conf); + + synchronized (refreshNamenodesLock) { + doRefreshNamenodes(newAddressMap); + } + } + + private void doRefreshNamenodes( + Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException { + assert Thread.holdsLock(refreshNamenodesLock); + + Set<String> toRefresh = Sets.newHashSet(); + Set<String> toAdd = Sets.newHashSet(); + Set<String> toRemove; + + synchronized (this) { + // Step 1. For each of the new nameservices, figure out whether + // it's an update of the set of NNs for an existing NS, + // or an entirely new nameservice. + for (String nameserviceId : addrMap.keySet()) { + if (bpByNameserviceId.containsKey(nameserviceId)) { + toRefresh.add(nameserviceId); + } else { + toAdd.add(nameserviceId); + } + } + + // Step 2. Any nameservices we currently have but are no longer present + // need to be removed. + toRemove = Sets.newHashSet(Sets.difference( + bpByNameserviceId.keySet(), addrMap.keySet())); + + assert toRefresh.size() + toAdd.size() == + addrMap.size() : + "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) + + " toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) + + " toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh); + + + // Step 3. Start new nameservices + if (!toAdd.isEmpty()) { + LOG.info("Starting BPOfferServices for nameservices: " + + Joiner.on(",").useForNull("<default>").join(toAdd)); + + for (String nsToAdd : toAdd) { + ArrayList<InetSocketAddress> addrs = + Lists.newArrayList(addrMap.get(nsToAdd).values()); + BPOfferService bpos = createBPOS(addrs); + bpByNameserviceId.put(nsToAdd, bpos); + offerServices.add(bpos); + } + } + startAll(); + } + + // Step 4. Shut down old nameservices. This happens outside + // of the synchronized(this) lock since they need to call + // back to .remove() from another thread + if (!toRemove.isEmpty()) { + LOG.info("Stopping BPOfferServices for nameservices: " + + Joiner.on(",").useForNull("<default>").join(toRemove)); + + for (String nsToRemove : toRemove) { + BPOfferService bpos = bpByNameserviceId.get(nsToRemove); + bpos.stop(); + bpos.join(); + // they will call remove on their own + } + } + + // Step 5. Update nameservices whose NN list has changed + if (!toRefresh.isEmpty()) { + LOG.info("Refreshing list of NNs for nameservices: " + + Joiner.on(",").useForNull("<default>").join(toRefresh)); + + for (String nsToRefresh : toRefresh) { + BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); + ArrayList<InetSocketAddress> addrs = + Lists.newArrayList(addrMap.get(nsToRefresh).values()); + bpos.refreshNNList(addrs); + } + } + } + + /** + * Extracted out for test purposes. + */ + protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) { + return new BPOfferService(nnAddrs, dn); + } +} \ No newline at end of file Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1209249&r1=1209248&r2=1209249&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Dec 1 21:26:08 2011 @@ -48,7 +48,6 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY; import java.io.BufferedOutputStream; @@ -71,12 +70,10 @@ import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -91,7 +88,6 @@ import org.apache.hadoop.fs.LocalFileSys import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -169,7 +165,6 @@ import org.apache.hadoop.util.VersionInf import org.mortbay.util.ajax.JSON; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -236,163 +231,6 @@ public class DataNode extends Configured return NetUtils.createSocketAddr(target); } - /** - * Manages he BPOfferService objects for the data node. - * Creation, removal, starting, stopping, shutdown on BPOfferService - * objects must be done via APIs in this class. - */ - @InterfaceAudience.Private - class BlockPoolManager { - private final Map<String, BPOfferService> bpMapping; - private final List<BPOfferService> offerServices; - - //This lock is used only to ensure exclusion of refreshNamenodes - private final Object refreshNamenodesLock = new Object(); - - BlockPoolManager(Configuration conf) - throws IOException { - bpMapping = new HashMap<String, BPOfferService>(); - offerServices = new ArrayList<BPOfferService>(); - - Map<String, Map<String, InetSocketAddress>> map = - DFSUtil.getNNServiceRpcAddresses(conf); - for (Entry<String, Map<String, InetSocketAddress>> entry : - map.entrySet()) { - List<InetSocketAddress> nnList = Lists.newArrayList(entry.getValue().values()); - BPOfferService bpos = new BPOfferService(nnList, DataNode.this); - offerServices.add(bpos); - } - } - - synchronized void addBlockPool(BPOfferService bpos) { - Preconditions.checkArgument(offerServices.contains(bpos), - "Unknown BPOS: %s", bpos); - if (bpos.getBlockPoolId() == null) { - throw new IllegalArgumentException("Null blockpool id"); - } - LOG.info("===> registering in bpmapping: " + bpos); - bpMapping.put(bpos.getBlockPoolId(), bpos); - } - - /** - * Returns the array of BPOfferService objects. - * Caution: The BPOfferService returned could be shutdown any time. - */ - synchronized BPOfferService[] getAllNamenodeThreads() { - BPOfferService[] bposArray = new BPOfferService[offerServices.size()]; - return offerServices.toArray(bposArray); - } - - synchronized BPOfferService get(String bpid) { - return bpMapping.get(bpid); - } - - // TODO(HA) would be good to kill this - synchronized BPOfferService get(InetSocketAddress addr) { - for (BPOfferService bpos : offerServices) { - if (bpos.containsNN(addr)) { - return bpos; - } - } - return null; - } - - synchronized void remove(BPOfferService t) { - offerServices.remove(t); - bpMapping.remove(t.getBlockPoolId()); - } - - void shutDownAll() throws InterruptedException { - BPOfferService[] bposArray = this.getAllNamenodeThreads(); - - for (BPOfferService bpos : bposArray) { - bpos.stop(); //interrupts the threads - } - //now join - for (BPOfferService bpos : bposArray) { - bpos.join(); - } - } - - synchronized void startAll() throws IOException { - try { - UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction<Object>() { - public Object run() throws Exception { - for (BPOfferService bpos : offerServices) { - bpos.start(); - } - return null; - } - }); - } catch (InterruptedException ex) { - IOException ioe = new IOException(); - ioe.initCause(ex.getCause()); - throw ioe; - } - } - - void joinAll() { - for (BPOfferService bpos: this.getAllNamenodeThreads()) { - bpos.join(); - } - } - - void refreshNamenodes(Configuration conf) - throws IOException { - throw new UnsupportedOperationException("TODO(HA)"); -/* - * TODO(HA) - - LOG.info("Refresh request received for nameservices: " - + conf.get(DFS_FEDERATION_NAMESERVICES)); - - // TODO(HA): need to update this for multiple NNs per nameservice - // For now, just list all of the NNs into this set - Map<String, Map<String, InetSocketAddress>> newAddressMap = - DFSUtil.getNNServiceRpcAddresses(conf); - Set<InetSocketAddress> newAddresses = Sets.newHashSet(); - for (ConfiguredNNAddress cnn : DFSUtil.flattenAddressMap(newAddressMap)) { - newAddresses.add(cnn.getAddress()); - } - - List<BPOfferService> toShutdown = new ArrayList<BPOfferService>(); - List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>(); - synchronized (refreshNamenodesLock) { - synchronized (this) { - for (InetSocketAddress nnaddr : offerServices.keySet()) { - if (!(newAddresses.contains(nnaddr))) { - toShutdown.add(offerServices.get(nnaddr)); - } - } - for (InetSocketAddress nnaddr : newAddresses) { - if (!(offerServices.containsKey(nnaddr))) { - toStart.add(nnaddr); - } - } - - for (InetSocketAddress nnaddr : toStart) { - BPOfferService bpos = new BPOfferService(nnaddr, DataNode.this); - offerServices.put(bpos.getNNSocketAddress(), bpos); - } - } - - for (BPOfferService bpos : toShutdown) { - bpos.stop(); - bpos.join(); - } - - // stoping the BPOSes causes them to call remove() on their own when they - // clean up. - - // Now start the threads that are not already running. - startAll(); - } - */ - } - - } - volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; public volatile FSDatasetInterface data = null; @@ -779,7 +617,8 @@ public class DataNode extends Configured metrics = DataNodeMetrics.create(conf, getMachineName()); - blockPoolManager = new BlockPoolManager(conf); + blockPoolManager = new BlockPoolManager(this); + blockPoolManager.refreshNamenodes(conf); } /** Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java?rev=1209249&view=auto ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java (added) +++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java Thu Dec 1 21:26:08 2011 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestBlockPoolManager { + private Log LOG = LogFactory.getLog(TestBlockPoolManager.class); + private DataNode mockDN = Mockito.mock(DataNode.class); + private BlockPoolManager bpm; + private StringBuilder log = new StringBuilder(); + private int mockIdx = 1; + + @Before + public void setupBPM() { + bpm = new BlockPoolManager(mockDN){ + + @Override + protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) { + final int idx = mockIdx++; + doLog("create #" + idx); + final BPOfferService bpos = Mockito.mock(BPOfferService.class); + Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString(); + // Log refreshes + try { + Mockito.doAnswer( + new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + doLog("refresh #" + idx); + return null; + } + }).when(bpos).refreshNNList( + Mockito.<ArrayList<InetSocketAddress>>any()); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Log stops + Mockito.doAnswer( + new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + doLog("stop #" + idx); + bpm.remove(bpos); + return null; + } + }).when(bpos).stop(); + return bpos; + } + }; + } + + private void doLog(String string) { + synchronized(log) { + LOG.info(string); + log.append(string).append("\n"); + } + } + + @Test + public void testSimpleSingleNS() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, + "hdfs://mock1:8020"); + bpm.refreshNamenodes(conf); + assertEquals("create #1\n", log.toString()); + } + + @Test + public void testFederationRefresh() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, + "ns1,ns2"); + addNN(conf, "ns1", "mock1:8020"); + addNN(conf, "ns2", "mock1:8020"); + bpm.refreshNamenodes(conf); + assertEquals( + "create #1\n" + + "create #2\n", log.toString()); + log.setLength(0); + + // Remove the first NS + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, + "ns1"); + bpm.refreshNamenodes(conf); + assertEquals( + "stop #1\n" + + "refresh #2\n", log.toString()); + log.setLength(0); + + // Add back an NS -- this creates a new BPOS since the old + // one for ns2 should have been previously retired + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, + "ns1,ns2"); + bpm.refreshNamenodes(conf); + assertEquals( + "create #3\n" + + "refresh #2\n", log.toString()); + } + + private static void addNN(Configuration conf, String ns, String addr) { + String key = DFSUtil.addKeySuffixes( + DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns); + conf.set(key, addr); + } +}