HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.  (Walter Su via 
lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/533a2be5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/533a2be5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/533a2be5

Branch: refs/heads/HDFS-8966
Commit: 533a2be5ac7c7f0473fdd24d6201582d08964e21
Parents: 600ad7b
Author: Lei Xu <l...@apache.org>
Authored: Fri Oct 23 13:52:59 2015 -0700
Committer: Lei Xu <l...@apache.org>
Committed: Fri Oct 23 13:52:59 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../datanode/fsdataset/impl/FsVolumeList.java   | 109 ++++++-------------
 2 files changed, 38 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/533a2be5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cf6558f..066ae02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1560,6 +1560,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-7087. Ability to list /.reserved. (Xiao Chen via wang)
 
+    HDFS-9264. Minor cleanup of operations on FsVolumeList#volumes.
+    (Walter Su via lei)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/533a2be5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index a73e129..608ee29 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,9 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.CopyOnWriteArrayList;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -46,8 +44,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
-  private final AtomicReference<FsVolumeImpl[]> volumes =
-      new AtomicReference<>(new FsVolumeImpl[0]);
+  private final CopyOnWriteArrayList<FsVolumeImpl> volumes =
+      new CopyOnWriteArrayList<>();
   // Tracks volume failures, sorted by volume path.
   private final Map<String, VolumeFailureInfo> volumeFailureInfos =
       Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
@@ -71,7 +69,7 @@ class FsVolumeList {
    * Return an immutable list view of all the volumes.
    */
   List<FsVolumeImpl> getVolumes() {
-    return Collections.unmodifiableList(Arrays.asList(volumes.get()));
+    return Collections.unmodifiableList(volumes);
   }
 
   private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long 
blockSize)
@@ -98,10 +96,8 @@ class FsVolumeList {
    */
   FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
       throws IOException {
-    // Get a snapshot of currently available volumes.
-    final FsVolumeImpl[] curVolumes = volumes.get();
-    final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length);
-    for(FsVolumeImpl v : curVolumes) {
+    final List<FsVolumeImpl> list = new ArrayList<>(volumes.size());
+    for(FsVolumeImpl v : volumes) {
       if (v.getStorageType() == storageType) {
         list.add(v);
       }
@@ -129,7 +125,7 @@ class FsVolumeList {
 
   long getDfsUsed() throws IOException {
     long dfsUsed = 0L;
-    for (FsVolumeImpl v : volumes.get()) {
+    for (FsVolumeImpl v : volumes) {
       try(FsVolumeReference ref = v.obtainReference()) {
         dfsUsed += v.getDfsUsed();
       } catch (ClosedChannelException e) {
@@ -141,7 +137,7 @@ class FsVolumeList {
 
   long getBlockPoolUsed(String bpid) throws IOException {
     long dfsUsed = 0L;
-    for (FsVolumeImpl v : volumes.get()) {
+    for (FsVolumeImpl v : volumes) {
       try (FsVolumeReference ref = v.obtainReference()) {
         dfsUsed += v.getBlockPoolUsed(bpid);
       } catch (ClosedChannelException e) {
@@ -153,7 +149,7 @@ class FsVolumeList {
 
   long getCapacity() {
     long capacity = 0L;
-    for (FsVolumeImpl v : volumes.get()) {
+    for (FsVolumeImpl v : volumes) {
       try (FsVolumeReference ref = v.obtainReference()) {
         capacity += v.getCapacity();
       } catch (IOException e) {
@@ -165,7 +161,7 @@ class FsVolumeList {
     
   long getRemaining() throws IOException {
     long remaining = 0L;
-    for (FsVolumeSpi vol : volumes.get()) {
+    for (FsVolumeSpi vol : volumes) {
       try (FsVolumeReference ref = vol.obtainReference()) {
         remaining += vol.getAvailable();
       } catch (ClosedChannelException e) {
@@ -183,7 +179,7 @@ class FsVolumeList {
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> replicaAddingThreads = new ArrayList<Thread>();
-    for (final FsVolumeImpl v : volumes.get()) {
+    for (final FsVolumeImpl v : volumes) {
       Thread t = new Thread() {
         public void run() {
           try (FsVolumeReference ref = v.obtainReference()) {
@@ -267,7 +263,7 @@ class FsVolumeList {
 
   @Override
   public String toString() {
-    return Arrays.toString(volumes.get());
+    return volumes.toString();
   }
 
   /**
@@ -277,21 +273,7 @@ class FsVolumeList {
    */
   void addVolume(FsVolumeReference ref) {
     FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
-    while (true) {
-      final FsVolumeImpl[] curVolumes = volumes.get();
-      final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
-      volumeList.add(volume);
-      if (volumes.compareAndSet(curVolumes,
-          volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
-        break;
-      } else {
-        if (FsDatasetImpl.LOG.isDebugEnabled()) {
-          FsDatasetImpl.LOG.debug(
-              "The volume list has been changed concurrently, " +
-                  "retry to remove volume: " + ref.getVolume().getStorageID());
-        }
-      }
-    }
+    volumes.add(volume);
     if (blockScanner != null) {
       blockScanner.addVolumeScanner(ref);
     } else {
@@ -311,37 +293,22 @@ class FsVolumeList {
    * @param target the volume instance to be removed.
    */
   private void removeVolume(FsVolumeImpl target) {
-    while (true) {
-      final FsVolumeImpl[] curVolumes = volumes.get();
-      final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
-      if (volumeList.remove(target)) {
-        if (volumes.compareAndSet(curVolumes,
-            volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
-          if (blockScanner != null) {
-            blockScanner.removeVolumeScanner(target);
-          }
-          try {
-            target.closeAndWait();
-          } catch (IOException e) {
-            FsDatasetImpl.LOG.warn(
-                "Error occurs when waiting volume to close: " + target, e);
-          }
-          target.shutdown();
-          FsDatasetImpl.LOG.info("Removed volume: " + target);
-          break;
-        } else {
-          if (FsDatasetImpl.LOG.isDebugEnabled()) {
-            FsDatasetImpl.LOG.debug(
-                "The volume list has been changed concurrently, " +
-                "retry to remove volume: " + target);
-          }
-        }
-      } else {
-        if (FsDatasetImpl.LOG.isDebugEnabled()) {
-          FsDatasetImpl.LOG.debug("Volume " + target +
-              " does not exist or is removed by others.");
-        }
-        break;
+    if (volumes.remove(target)) {
+      if (blockScanner != null) {
+        blockScanner.removeVolumeScanner(target);
+      }
+      try {
+        target.closeAndWait();
+      } catch (IOException e) {
+        FsDatasetImpl.LOG.warn(
+            "Error occurs when waiting volume to close: " + target, e);
+      }
+      target.shutdown();
+      FsDatasetImpl.LOG.info("Removed volume: " + target);
+    } else {
+      if (FsDatasetImpl.LOG.isDebugEnabled()) {
+        FsDatasetImpl.LOG.debug("Volume " + target +
+            " does not exist or is removed by others.");
       }
     }
   }
@@ -352,16 +319,10 @@ class FsVolumeList {
    * @param clearFailure set true to remove failure info for this volume.
    */
   void removeVolume(File volume, boolean clearFailure) {
-    // Make a copy of volumes to remove one volume.
-    final FsVolumeImpl[] curVolumes = volumes.get();
-    final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
-    for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
-      FsVolumeImpl fsVolume = it.next();
-      String basePath, targetPath;
-      basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
-      targetPath = volume.getAbsolutePath();
+    for (FsVolumeImpl fsVolume : volumes) {
+      String basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
+      String targetPath = volume.getAbsolutePath();
       if (basePath.equals(targetPath)) {
-        // Make sure the removed volume is the one in the curVolumes.
         removeVolume(fsVolume);
       }
     }
@@ -397,7 +358,7 @@ class FsVolumeList {
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
-    for (final FsVolumeImpl v : volumes.get()) {
+    for (final FsVolumeImpl v : volumes) {
       Thread t = new Thread() {
         public void run() {
           try (FsVolumeReference ref = v.obtainReference()) {
@@ -438,13 +399,13 @@ class FsVolumeList {
   
   void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
       blocksPerVolume) {
-    for (FsVolumeImpl v : volumes.get()) {
+    for (FsVolumeImpl v : volumes) {
       v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
     }
   }
 
   void shutdown() {
-    for (FsVolumeImpl volume : volumes.get()) {
+    for (FsVolumeImpl volume : volumes) {
       if(volume != null) {
         volume.shutdown();
       }

Reply via email to