HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin P. 
McCabe)
(cherry picked from commit 3b173d95171d01ab55042b1162569d1cf14a8d43)

Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

(cherry picked from commit dda1fc169db2e69964cca746be4ff8965eb8b56f)

Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba28192f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba28192f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba28192f

Branch: refs/heads/sjlee/hdfs-merge
Commit: ba28192f9d5a8385283bd717bca494e6981d378f
Parents: 418bd16
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Wed Dec 17 16:41:59 2014 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Wed Aug 12 22:11:55 2015 -0700

----------------------------------------------------------------------
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  28 ++--
 .../datanode/fsdataset/impl/FsVolumeList.java   | 138 +++++++++++++------
 .../fsdataset/impl/TestFsDatasetImpl.java       |  70 +++++++++-
 3 files changed, 174 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba28192f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e7fa6d7..0d9f096 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -127,7 +127,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public List<FsVolumeImpl> getVolumes() {
-    return volumes.volumes;
+    return volumes.getVolumes();
   }
 
   @Override
@@ -140,9 +140,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     StorageReport[] reports;
     synchronized (statsLock) {
-      reports = new StorageReport[volumes.volumes.size()];
+      List<FsVolumeImpl> curVolumes = getVolumes();
+      reports = new StorageReport[curVolumes.size()];
       int i = 0;
-      for (FsVolumeImpl volume : volumes.volumes) {
+      for (FsVolumeImpl volume : curVolumes) {
         reports[i++] = new StorageReport(volume.toDatanodeStorage(),
                                          false,
                                          volume.getCapacity(),
@@ -1322,7 +1323,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
     Map<String, ArrayList<ReplicaInfo>> uc =
         new HashMap<String, ArrayList<ReplicaInfo>>();
 
-    for (FsVolumeSpi v : volumes.volumes) {
+    List<FsVolumeImpl> curVolumes = getVolumes();
+    for (FsVolumeSpi v : curVolumes) {
       finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
       uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
     }
@@ -1349,7 +1351,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       }
     }
 
-    for (FsVolumeSpi v : volumes.volumes) {
+    for (FsVolumeImpl v : curVolumes) {
       ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
       ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
       blockReportsMap.put(((FsVolumeImpl) v).toDatanodeStorage(),
@@ -2222,7 +2224,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
 
   private Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
-    for (FsVolumeImpl volume : volumes.volumes) {
+    for (FsVolumeImpl volume : getVolumes()) {
       long used = 0;
       long free = 0;
       try {
@@ -2256,8 +2258,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
   @Override //FsDatasetSpi
   public synchronized void deleteBlockPool(String bpid, boolean force)
       throws IOException {
+    List<FsVolumeImpl> curVolumes = getVolumes();
     if (!force) {
-      for (FsVolumeImpl volume : volumes.volumes) {
+      for (FsVolumeImpl volume : curVolumes) {
         if (!volume.isBPDirEmpty(bpid)) {
           LOG.warn(bpid + " has some block files, cannot delete unless 
forced");
           throw new IOException("Cannot delete block pool, "
@@ -2265,7 +2268,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
         }
       }
     }
-    for (FsVolumeImpl volume : volumes.volumes) {
+    for (FsVolumeImpl volume : curVolumes) {
       volume.deleteBPDirectories(bpid, force);
     }
   }
@@ -2283,13 +2286,14 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
       long[] blockIds) throws IOException {
+    List<FsVolumeImpl> curVolumes = getVolumes();
     // List of VolumeIds, one per volume on the datanode
-    List<byte[]> blocksVolumeIds = new 
ArrayList<byte[]>(volumes.volumes.size());
+    List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(curVolumes.size());
     // List of indexes into the list of VolumeIds, pointing at the VolumeId of
     // the volume that the block is on
     List<Integer> blocksVolumeIndexes = new 
ArrayList<Integer>(blockIds.length);
     // Initialize the list of VolumeIds simply by enumerating the volumes
-    for (int i = 0; i < volumes.volumes.size(); i++) {
+    for (int i = 0; i < curVolumes.size(); i++) {
       blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
     }
     // Determine the index of the VolumeId of each block's volume, by 
comparing 
@@ -2302,7 +2306,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       int volumeIndex = 0;
       if (info != null) {
         FsVolumeSpi blockVolume = info.getVolume();
-        for (FsVolumeImpl volume : volumes.volumes) {
+        for (FsVolumeImpl volume : curVolumes) {
           // This comparison of references should be safe
           if (blockVolume == volume) {
             isValid = true;
@@ -2526,7 +2530,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
 
       // Don't worry about fragmentation for now. We don't expect more than one
       // transient volume per DN.
-      for (FsVolumeImpl v : volumes.volumes) {
+      for (FsVolumeImpl v : getVolumes()) {
         if (v.isTransientStorage()) {
           capacity += v.getCapacity();
           free += v.getAvailable();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba28192f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 55329ae..9483444 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -19,10 +19,13 @@ package 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -31,11 +34,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
-  /**
-   * Read access to this unmodifiable list is not synchronized.
-   * This list is replaced on modification holding "this" lock.
-   */
-  volatile List<FsVolumeImpl> volumes = null;
+  private final AtomicReference<FsVolumeImpl[]> volumes =
+      new AtomicReference<FsVolumeImpl[]>(new FsVolumeImpl[0]);
   private Object checkDirsMutex = new Object();
 
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
@@ -50,19 +50,28 @@ class FsVolumeList {
   int numberOfFailedVolumes() {
     return numFailedVolumes;
   }
-  
+
+  /**
+   * Return an immutable list view of all the volumes.
+   */
+  List<FsVolumeImpl> getVolumes() {
+    return Collections.unmodifiableList(Arrays.asList(volumes.get()));
+  }
+
   /** 
-   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
-   * by a single thread and next volume is chosen with no concurrent
-   * update to {@link #volumes}.
+   * Get next volume.
+   *
    * @param blockSize free space needed on the volume
    * @param storageType the desired {@link StorageType} 
    * @return next volume to store the block in.
    */
-  synchronized FsVolumeImpl getNextVolume(StorageType storageType,
-      long blockSize) throws IOException {
-    final List<FsVolumeImpl> list = new 
ArrayList<FsVolumeImpl>(volumes.size());
-    for(FsVolumeImpl v : volumes) {
+  FsVolumeImpl getNextVolume(StorageType storageType, long blockSize)
+      throws IOException {
+    // Get a snapshot of currently available volumes.
+    final FsVolumeImpl[] curVolumes = volumes.get();
+    final List<FsVolumeImpl> list =
+        new ArrayList<FsVolumeImpl>(curVolumes.length);
+    for(FsVolumeImpl v : curVolumes) {
       if (v.getStorageType() == storageType) {
         list.add(v);
       }
@@ -71,16 +80,17 @@ class FsVolumeList {
   }
 
   /**
-   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
-   * by a single thread and next volume is chosen with no concurrent
-   * update to {@link #volumes}.
+   * Get next volume.
+   *
    * @param blockSize free space needed on the volume
    * @return next volume to store the block in.
    */
-  synchronized FsVolumeImpl getNextTransientVolume(
-      long blockSize) throws IOException {
-    final List<FsVolumeImpl> list = new 
ArrayList<FsVolumeImpl>(volumes.size());
-    for(FsVolumeImpl v : volumes) {
+  FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException {
+    // Get a snapshot of currently available volumes.
+    final List<FsVolumeImpl> curVolumes = getVolumes();
+    final List<FsVolumeImpl> list =
+        new ArrayList<FsVolumeImpl>(curVolumes.size());
+    for(FsVolumeImpl v : curVolumes) {
       if (v.isTransientStorage()) {
         list.add(v);
       }
@@ -90,7 +100,7 @@ class FsVolumeList {
 
   long getDfsUsed() throws IOException {
     long dfsUsed = 0L;
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       dfsUsed += v.getDfsUsed();
     }
     return dfsUsed;
@@ -98,7 +108,7 @@ class FsVolumeList {
 
   long getBlockPoolUsed(String bpid) throws IOException {
     long dfsUsed = 0L;
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       dfsUsed += v.getBlockPoolUsed(bpid);
     }
     return dfsUsed;
@@ -106,7 +116,7 @@ class FsVolumeList {
 
   long getCapacity() {
     long capacity = 0L;
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       capacity += v.getCapacity();
     }
     return capacity;
@@ -114,7 +124,7 @@ class FsVolumeList {
     
   long getRemaining() throws IOException {
     long remaining = 0L;
-    for (FsVolumeSpi vol : volumes) {
+    for (FsVolumeSpi vol : volumes.get()) {
       remaining += vol.getAvailable();
     }
     return remaining;
@@ -128,7 +138,7 @@ class FsVolumeList {
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> replicaAddingThreads = new ArrayList<Thread>();
-    for (final FsVolumeImpl v : volumes) {
+    for (final FsVolumeImpl v : volumes.get()) {
       Thread t = new Thread() {
         public void run() {
           try {
@@ -177,7 +187,7 @@ class FsVolumeList {
       ArrayList<FsVolumeImpl> removedVols = null;
       
       // Make a copy of volumes for performing modification 
-      final List<FsVolumeImpl> volumeList = new 
ArrayList<FsVolumeImpl>(volumes);
+      final List<FsVolumeImpl> volumeList = getVolumes();
 
       for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
         final FsVolumeImpl fsv = i.next();
@@ -189,7 +199,7 @@ class FsVolumeList {
             removedVols = new ArrayList<FsVolumeImpl>(1);
           }
           removedVols.add(fsv);
-          removeVolume(fsv.getBasePath());
+          removeVolume(fsv);
           numFailedVolumes++;
         }
       }
@@ -212,31 +222,71 @@ class FsVolumeList {
    * Dynamically add new volumes to the existing volumes that this DN manages.
    * @param newVolume the instance of new FsVolumeImpl.
    */
-  synchronized void addVolume(FsVolumeImpl newVolume) {
+  void addVolume(FsVolumeImpl newVolume) {
     // Make a copy of volumes to add new volumes.
-    final List<FsVolumeImpl> volumeList = volumes == null ?
-        new ArrayList<FsVolumeImpl>() :
-        new ArrayList<FsVolumeImpl>(volumes);
-    volumeList.add(newVolume);
-    volumes = Collections.unmodifiableList(volumeList);
+    while (true) {
+      final FsVolumeImpl[] curVolumes = volumes.get();
+      final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
+      volumeList.add(newVolume);
+      if (volumes.compareAndSet(curVolumes,
+          volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+        break;
+      } else {
+        if (FsDatasetImpl.LOG.isDebugEnabled()) {
+          FsDatasetImpl.LOG.debug(
+              "The volume list has been changed concurrently, " +
+                  "retry to remove volume: " + newVolume);
+        }
+      }
+    }
+
     FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
   }
 
   /**
-   * Dynamically remove volume to the list.
+   * Dynamically remove a volume in the list.
+   * @param target the volume instance to be removed.
+   */
+  private void removeVolume(FsVolumeImpl target) {
+    while (true) {
+      final FsVolumeImpl[] curVolumes = volumes.get();
+      final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
+      if (volumeList.remove(target)) {
+        if (volumes.compareAndSet(curVolumes,
+            volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+          target.shutdown();
+          FsDatasetImpl.LOG.info("Removed volume: " + target);
+          break;
+        } else {
+          if (FsDatasetImpl.LOG.isDebugEnabled()) {
+            FsDatasetImpl.LOG.debug(
+                "The volume list has been changed concurrently, " +
+                "retry to remove volume: " + target);
+          }
+        }
+      } else {
+        if (FsDatasetImpl.LOG.isDebugEnabled()) {
+          FsDatasetImpl.LOG.debug("Volume " + target +
+              " does not exist or is removed by others.");
+        }
+        break;
+      }
+    }
+  }
+
+  /**
+   * Dynamically remove volume in the list.
    * @param volume the volume to be removed.
    */
-  synchronized void removeVolume(String volume) {
+  void removeVolume(String volume) {
     // Make a copy of volumes to remove one volume.
-    final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+    final FsVolumeImpl[] curVolumes = volumes.get();
+    final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
     for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
       FsVolumeImpl fsVolume = it.next();
       if (fsVolume.getBasePath().equals(volume)) {
-        fsVolume.shutdown();
-        it.remove();
-        volumes = Collections.unmodifiableList(volumeList);
-        FsDatasetImpl.LOG.info("Removed volume: " + volume);
-        break;
+        // Make sure the removed volume is the one in the curVolumes.
+        removeVolume(fsVolume);
       }
     }
   }
@@ -247,7 +297,7 @@ class FsVolumeList {
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
-    for (final FsVolumeImpl v : volumes) {
+    for (final FsVolumeImpl v : volumes.get()) {
       Thread t = new Thread() {
         public void run() {
           try {
@@ -285,13 +335,13 @@ class FsVolumeList {
   }
   
   void removeBlockPool(String bpid) {
-    for (FsVolumeImpl v : volumes) {
+    for (FsVolumeImpl v : volumes.get()) {
       v.shutdownBlockPool(bpid);
     }
   }
 
   void shutdown() {
-    for (FsVolumeImpl volume : volumes) {
+    for (FsVolumeImpl volume : volumes.get()) {
       if(volume != null) {
         volume.shutdown();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba28192f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 28951c3..60fc238 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -34,12 +34,15 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,13 +52,19 @@ import java.util.List;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -102,9 +111,9 @@ public class TestFsDatasetImpl {
 
   @Before
   public void setUp() throws IOException {
-    datanode = Mockito.mock(DataNode.class);
-    storage = Mockito.mock(DataStorage.class);
-    scanner = Mockito.mock(DataBlockScanner.class);
+    datanode = mock(DataNode.class);
+    storage = mock(DataStorage.class);
+    scanner = mock(DataBlockScanner.class);
     this.conf = new Configuration();
     final DNConf dnConf = new DNConf(conf);
 
@@ -204,8 +213,8 @@ public class TestFsDatasetImpl {
 
   @Test
   public void testDuplicateReplicaResolution() throws IOException {
-    FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class);
-    FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class);
+    FsVolumeImpl fsv1 = mock(FsVolumeImpl.class);
+    FsVolumeImpl fsv2 = mock(FsVolumeImpl.class);
 
     File f1 = new File("d1/block");
     File f2 = new File("d2/block");
@@ -232,4 +241,53 @@ public class TestFsDatasetImpl {
     assertSame(replica,
         BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
   }
+
+  @Test(timeout = 5000)
+  public void testChangeVolumeWithRunningCheckDirs() throws IOException {
+    RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
+        new RoundRobinVolumeChoosingPolicy<FsVolumeImpl>();
+    final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    final List<FsVolumeImpl> oldVolumes = new ArrayList<FsVolumeImpl>();
+
+    // Initialize FsVolumeList with 5 mock volumes.
+    final int NUM_VOLUMES = 5;
+    for (int i = 0; i < NUM_VOLUMES; i++) {
+      FsVolumeImpl volume = mock(FsVolumeImpl.class);
+      oldVolumes.add(volume);
+      when(volume.getBasePath()).thenReturn("data" + i);
+      volumeList.addVolume(volume);
+    }
+
+    // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
+    // volume and add another volume. It does not affect checkDirs() running.
+    final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
+    FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        volumeList.removeVolume("data4");
+        volumeList.addVolume(newVolume);
+        return null;
+      }
+    }).when(blockedVolume).checkDirs();
+
+    FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
+    doThrow(new DiskChecker.DiskErrorException("broken"))
+        .when(brokenVolume).checkDirs();
+
+    volumeList.checkDirs();
+
+    // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
+    // before running removeVolume(), it is supposed to run checkDirs() on all
+    // the old volumes.
+    for (FsVolumeImpl volume : oldVolumes) {
+      verify(volume).checkDirs();
+    }
+    // New volume is not visible to checkDirs() process.
+    verify(newVolume, never()).checkDirs();
+    assertTrue(volumeList.getVolumes().contains(newVolume));
+    assertFalse(volumeList.getVolumes().contains(brokenVolume));
+    assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
+  }
 }

Reply via email to