Repository: hadoop
Updated Branches:
refs/heads/branch-2 d0d42c4e3 -> dda1fc169
HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin P.
McCabe)
(cherry picked from commit 3b173d95171d01ab55042b1162569d1cf14a8d43)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dda1fc16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dda1fc16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dda1fc16
Branch: refs/heads/branch-2
Commit: dda1fc169db2e69964cca746be4ff8965eb8b56f
Parents: d0d42c4
Author: Colin Patrick Mccabe <[email protected]>
Authored: Wed Dec 17 16:41:59 2014 -0800
Committer: Colin Patrick Mccabe <[email protected]>
Committed: Wed Dec 17 16:43:33 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../datanode/fsdataset/impl/FsDatasetImpl.java | 28 ++--
.../datanode/fsdataset/impl/FsVolumeList.java | 136 +++++++++++++------
.../fsdataset/impl/TestFsDatasetImpl.java | 66 ++++++++-
4 files changed, 173 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda1fc16/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2e6ab8a..49d834d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -208,6 +208,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7528. Consolidate symlink-related implementation into a single class.
(wheat9)
+ HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin
+ P. McCabe)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda1fc16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f91e05e..30aecd2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -127,7 +127,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() {
- return volumes.volumes;
+ return volumes.getVolumes();
}
@Override
@@ -140,9 +140,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
StorageReport[] reports;
synchronized (statsLock) {
- reports = new StorageReport[volumes.volumes.size()];
+ List<FsVolumeImpl> curVolumes = getVolumes();
+ reports = new StorageReport[curVolumes.size()];
int i = 0;
- for (FsVolumeImpl volume : volumes.volumes) {
+ for (FsVolumeImpl volume : curVolumes) {
reports[i++] = new StorageReport(volume.toDatanodeStorage(),
false,
volume.getCapacity(),
@@ -1389,7 +1390,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
Map<String, ArrayList<ReplicaInfo>> uc =
new HashMap<String, ArrayList<ReplicaInfo>>();
- for (FsVolumeSpi v : volumes.volumes) {
+ List<FsVolumeImpl> curVolumes = getVolumes();
+ for (FsVolumeSpi v : curVolumes) {
finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
}
@@ -1416,7 +1418,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
}
- for (FsVolumeSpi v : volumes.volumes) {
+ for (FsVolumeImpl v : curVolumes) {
ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
blockReportsMap.put(((FsVolumeImpl) v).toDatanodeStorage(),
@@ -2284,7 +2286,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
- for (FsVolumeImpl volume : volumes.volumes) {
+ for (FsVolumeImpl volume : getVolumes()) {
long used = 0;
long free = 0;
try {
@@ -2318,8 +2320,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
@Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException {
+ List<FsVolumeImpl> curVolumes = getVolumes();
if (!force) {
- for (FsVolumeImpl volume : volumes.volumes) {
+ for (FsVolumeImpl volume : curVolumes) {
if (!volume.isBPDirEmpty(bpid)) {
LOG.warn(bpid + " has some block files, cannot delete unless
forced");
throw new IOException("Cannot delete block pool, "
@@ -2327,7 +2330,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
}
}
}
- for (FsVolumeImpl volume : volumes.volumes) {
+ for (FsVolumeImpl volume : curVolumes) {
volume.deleteBPDirectories(bpid, force);
}
}
@@ -2345,13 +2348,14 @@ class FsDatasetImpl implements
FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException {
+ List<FsVolumeImpl> curVolumes = getVolumes();
// List of VolumeIds, one per volume on the datanode
- List<byte[]> blocksVolumeIds = new
ArrayList<byte[]>(volumes.volumes.size());
+ List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new
ArrayList<Integer>(blockIds.length);
// Initialize the list of VolumeIds simply by enumerating the volumes
- for (int i = 0; i < volumes.volumes.size(); i++) {
+ for (int i = 0; i < curVolumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by
comparing
@@ -2364,7 +2368,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
int volumeIndex = 0;
if (info != null) {
FsVolumeSpi blockVolume = info.getVolume();
- for (FsVolumeImpl volume : volumes.volumes) {
+ for (FsVolumeImpl volume : curVolumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
@@ -2617,7 +2621,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl>
{
// Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN.
- for (FsVolumeImpl v : volumes.volumes) {
+ for (FsVolumeImpl v : getVolumes()) {
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda1fc16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 55329ae..b506710 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -19,10 +19,13 @@ package
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -31,11 +34,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
class FsVolumeList {
- /**
- * Read access to this unmodifiable list is not synchronized.
- * This list is replaced on modification holding "this" lock.
- */
- volatile List<FsVolumeImpl> volumes = null;
+ private final AtomicReference<FsVolumeImpl[]> volumes =
+ new AtomicReference<>(new FsVolumeImpl[0]);
private Object checkDirsMutex = new Object();
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
@@ -50,19 +50,27 @@ class FsVolumeList {
int numberOfFailedVolumes() {
return numFailedVolumes;
}
-
+
+ /**
+ * Return an immutable list view of all the volumes.
+ */
+ List<FsVolumeImpl> getVolumes() {
+ return Collections.unmodifiableList(Arrays.asList(volumes.get()));
+ }
+
/**
- * Get next volume. Synchronized to ensure {@link #curVolume} is updated
- * by a single thread and next volume is chosen with no concurrent
- * update to {@link #volumes}.
+ * Get next volume.
+ *
* @param blockSize free space needed on the volume
* @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
- synchronized FsVolumeImpl getNextVolume(StorageType storageType,
- long blockSize) throws IOException {
- final List<FsVolumeImpl> list = new
ArrayList<FsVolumeImpl>(volumes.size());
- for(FsVolumeImpl v : volumes) {
+ FsVolumeImpl getNextVolume(StorageType storageType, long blockSize)
+ throws IOException {
+ // Get a snapshot of currently available volumes.
+ final FsVolumeImpl[] curVolumes = volumes.get();
+ final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length);
+ for(FsVolumeImpl v : curVolumes) {
if (v.getStorageType() == storageType) {
list.add(v);
}
@@ -71,16 +79,16 @@ class FsVolumeList {
}
/**
- * Get next volume. Synchronized to ensure {@link #curVolume} is updated
- * by a single thread and next volume is chosen with no concurrent
- * update to {@link #volumes}.
+ * Get next volume.
+ *
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
- synchronized FsVolumeImpl getNextTransientVolume(
- long blockSize) throws IOException {
- final List<FsVolumeImpl> list = new
ArrayList<FsVolumeImpl>(volumes.size());
- for(FsVolumeImpl v : volumes) {
+ FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException {
+ // Get a snapshot of currently available volumes.
+ final List<FsVolumeImpl> curVolumes = getVolumes();
+ final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size());
+ for(FsVolumeImpl v : curVolumes) {
if (v.isTransientStorage()) {
list.add(v);
}
@@ -90,7 +98,7 @@ class FsVolumeList {
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
- for (FsVolumeImpl v : volumes) {
+ for (FsVolumeImpl v : volumes.get()) {
dfsUsed += v.getDfsUsed();
}
return dfsUsed;
@@ -98,7 +106,7 @@ class FsVolumeList {
long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L;
- for (FsVolumeImpl v : volumes) {
+ for (FsVolumeImpl v : volumes.get()) {
dfsUsed += v.getBlockPoolUsed(bpid);
}
return dfsUsed;
@@ -106,7 +114,7 @@ class FsVolumeList {
long getCapacity() {
long capacity = 0L;
- for (FsVolumeImpl v : volumes) {
+ for (FsVolumeImpl v : volumes.get()) {
capacity += v.getCapacity();
}
return capacity;
@@ -114,7 +122,7 @@ class FsVolumeList {
long getRemaining() throws IOException {
long remaining = 0L;
- for (FsVolumeSpi vol : volumes) {
+ for (FsVolumeSpi vol : volumes.get()) {
remaining += vol.getAvailable();
}
return remaining;
@@ -128,7 +136,7 @@ class FsVolumeList {
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
- for (final FsVolumeImpl v : volumes) {
+ for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try {
@@ -177,7 +185,7 @@ class FsVolumeList {
ArrayList<FsVolumeImpl> removedVols = null;
// Make a copy of volumes for performing modification
- final List<FsVolumeImpl> volumeList = new
ArrayList<FsVolumeImpl>(volumes);
+ final List<FsVolumeImpl> volumeList = getVolumes();
for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
final FsVolumeImpl fsv = i.next();
@@ -189,7 +197,7 @@ class FsVolumeList {
removedVols = new ArrayList<FsVolumeImpl>(1);
}
removedVols.add(fsv);
- removeVolume(fsv.getBasePath());
+ removeVolume(fsv);
numFailedVolumes++;
}
}
@@ -212,31 +220,71 @@ class FsVolumeList {
* Dynamically add new volumes to the existing volumes that this DN manages.
* @param newVolume the instance of new FsVolumeImpl.
*/
- synchronized void addVolume(FsVolumeImpl newVolume) {
+ void addVolume(FsVolumeImpl newVolume) {
// Make a copy of volumes to add new volumes.
- final List<FsVolumeImpl> volumeList = volumes == null ?
- new ArrayList<FsVolumeImpl>() :
- new ArrayList<FsVolumeImpl>(volumes);
- volumeList.add(newVolume);
- volumes = Collections.unmodifiableList(volumeList);
+ while (true) {
+ final FsVolumeImpl[] curVolumes = volumes.get();
+ final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
+ volumeList.add(newVolume);
+ if (volumes.compareAndSet(curVolumes,
+ volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+ break;
+ } else {
+ if (FsDatasetImpl.LOG.isDebugEnabled()) {
+ FsDatasetImpl.LOG.debug(
+ "The volume list has been changed concurrently, " +
+ "retry to remove volume: " + newVolume);
+ }
+ }
+ }
+
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
}
/**
- * Dynamically remove volume to the list.
+ * Dynamically remove a volume in the list.
+ * @param target the volume instance to be removed.
+ */
+ private void removeVolume(FsVolumeImpl target) {
+ while (true) {
+ final FsVolumeImpl[] curVolumes = volumes.get();
+ final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
+ if (volumeList.remove(target)) {
+ if (volumes.compareAndSet(curVolumes,
+ volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+ target.shutdown();
+ FsDatasetImpl.LOG.info("Removed volume: " + target);
+ break;
+ } else {
+ if (FsDatasetImpl.LOG.isDebugEnabled()) {
+ FsDatasetImpl.LOG.debug(
+ "The volume list has been changed concurrently, " +
+ "retry to remove volume: " + target);
+ }
+ }
+ } else {
+ if (FsDatasetImpl.LOG.isDebugEnabled()) {
+ FsDatasetImpl.LOG.debug("Volume " + target +
+ " does not exist or is removed by others.");
+ }
+ break;
+ }
+ }
+ }
+
+ /**
+ * Dynamically remove volume in the list.
* @param volume the volume to be removed.
*/
- synchronized void removeVolume(String volume) {
+ void removeVolume(String volume) {
// Make a copy of volumes to remove one volume.
- final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+ final FsVolumeImpl[] curVolumes = volumes.get();
+ final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
FsVolumeImpl fsVolume = it.next();
if (fsVolume.getBasePath().equals(volume)) {
- fsVolume.shutdown();
- it.remove();
- volumes = Collections.unmodifiableList(volumeList);
- FsDatasetImpl.LOG.info("Removed volume: " + volume);
- break;
+ // Make sure the removed volume is the one in the curVolumes.
+ removeVolume(fsVolume);
}
}
}
@@ -247,7 +295,7 @@ class FsVolumeList {
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
- for (final FsVolumeImpl v : volumes) {
+ for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try {
@@ -285,13 +333,13 @@ class FsVolumeList {
}
void removeBlockPool(String bpid) {
- for (FsVolumeImpl v : volumes) {
+ for (FsVolumeImpl v : volumes.get()) {
v.shutdownBlockPool(bpid);
}
}
void shutdown() {
- for (FsVolumeImpl volume : volumes) {
+ for (FsVolumeImpl volume : volumes.get()) {
if(volume != null) {
volume.shutdown();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda1fc16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 956ab78..aa4b68c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -32,12 +32,15 @@ import
org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import
org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
@@ -47,11 +50,17 @@ import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -98,9 +107,9 @@ public class TestFsDatasetImpl {
@Before
public void setUp() throws IOException {
- datanode = Mockito.mock(DataNode.class);
- storage = Mockito.mock(DataStorage.class);
- scanner = Mockito.mock(DataBlockScanner.class);
+ datanode = mock(DataNode.class);
+ storage = mock(DataStorage.class);
+ scanner = mock(DataBlockScanner.class);
this.conf = new Configuration();
final DNConf dnConf = new DNConf(conf);
@@ -197,4 +206,53 @@ public class TestFsDatasetImpl {
verify(scanner, times(BLOCK_POOL_IDS.length))
.deleteBlocks(anyString(), any(Block[].class));
}
+
+ @Test(timeout = 5000)
+ public void testChangeVolumeWithRunningCheckDirs() throws IOException {
+ RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
+ new RoundRobinVolumeChoosingPolicy<>();
+ final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+ final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
+
+ // Initialize FsVolumeList with 5 mock volumes.
+ final int NUM_VOLUMES = 5;
+ for (int i = 0; i < NUM_VOLUMES; i++) {
+ FsVolumeImpl volume = mock(FsVolumeImpl.class);
+ oldVolumes.add(volume);
+ when(volume.getBasePath()).thenReturn("data" + i);
+ volumeList.addVolume(volume);
+ }
+
+ // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
+ // volume and add another volume. It does not affect checkDirs() running.
+ final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
+ FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ volumeList.removeVolume("data4");
+ volumeList.addVolume(newVolume);
+ return null;
+ }
+ }).when(blockedVolume).checkDirs();
+
+ FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
+ doThrow(new DiskChecker.DiskErrorException("broken"))
+ .when(brokenVolume).checkDirs();
+
+ volumeList.checkDirs();
+
+ // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
+ // before running removeVolume(), it is supposed to run checkDirs() on all
+ // the old volumes.
+ for (FsVolumeImpl volume : oldVolumes) {
+ verify(volume).checkDirs();
+ }
+ // New volume is not visible to checkDirs() process.
+ verify(newVolume, never()).checkDirs();
+ assertTrue(volumeList.getVolumes().contains(newVolume));
+ assertFalse(volumeList.getVolumes().contains(brokenVolume));
+ assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
+ }
}