This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3b2a7aabf7af538fa337b8c155295a922f014b3e
Author: Rakesh Radhakrishnan <rake...@apache.org>
AuthorDate: Wed May 8 17:20:21 2019 +0530

    HDFS-14401. Refine the implementation for HDFS cache on SCM. Contributed by 
Feilong He.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  14 --
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  22 ---
 .../datanode/fsdataset/impl/FsDatasetCache.java    |  31 ++--
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |   2 +
 .../fsdataset/impl/MappableBlockLoader.java        |  19 +-
 .../fsdataset/impl/MappableBlockLoaderFactory.java |  47 +++++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  21 +--
 .../fsdataset/impl/PmemMappableBlockLoader.java    |  40 ++---
 .../datanode/fsdataset/impl/PmemMappedBlock.java   |  10 +-
 .../datanode/fsdataset/impl/PmemVolumeManager.java | 197 +++++++++++++++------
 .../src/main/resources/hdfs-default.xml            |  24 ---
 .../impl/TestCacheByPmemMappableBlockLoader.java   |  26 ++-
 .../fsdataset/impl/TestFsDatasetCache.java         |   5 +-
 13 files changed, 256 insertions(+), 202 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8ad1652..ce0c6d3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
-import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
-import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -382,22 +380,10 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final String DFS_DATANODE_CACHE_REVOCATION_POLLING_MS = 
"dfs.datanode.cache.revocation.polling.ms";
   public static final long DFS_DATANODE_CACHE_REVOCATION_POLLING_MS_DEFAULT = 
500L;
 
-  // Currently, the available cache loaders are MemoryMappableBlockLoader,
-  // PmemMappableBlockLoader. MemoryMappableBlockLoader is the default cache
-  // loader to cache block replica to memory.
-  public static final String DFS_DATANODE_CACHE_LOADER_CLASS =
-      "dfs.datanode.cache.loader.class";
-  public static final Class<? extends MappableBlockLoader>
-      DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT =
-      MemoryMappableBlockLoader.class;
   // Multiple dirs separated by "," are acceptable.
   public static final String DFS_DATANODE_CACHE_PMEM_DIRS_KEY =
       "dfs.datanode.cache.pmem.dirs";
   public static final String DFS_DATANODE_CACHE_PMEM_DIRS_DEFAULT = "";
-  // The cache capacity of persistent memory
-  public static final String DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY =
-      "dfs.datanode.cache.pmem.capacity";
-  public static final long DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT = 0L;
 
   public static final String 
DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = 
"dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean 
DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 6ee8e92..139ad77 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,10 +27,6 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
@@ -71,7 +67,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.Util;
-import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlockLoader;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 
 import java.util.concurrent.TimeUnit;
@@ -121,9 +116,7 @@ public class DNConf {
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
 
-  private final Class<? extends MappableBlockLoader> cacheLoaderClass;
   final long maxLockedMemory;
-  private final long maxLockedPmem;
   private final String[] pmemDirs;
 
   private final long bpReadyTimeout;
@@ -266,17 +259,10 @@ public class DNConf {
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
 
-    this.cacheLoaderClass = getConf().getClass(DFS_DATANODE_CACHE_LOADER_CLASS,
-        DFS_DATANODE_CACHE_LOADER_CLASS_DEFAULT, MappableBlockLoader.class);
-
     this.maxLockedMemory = getConf().getLong(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
 
-    this.maxLockedPmem = getConf().getLong(
-        DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY,
-        DFS_DATANODE_CACHE_PMEM_CAPACITY_DEFAULT);
-
     this.pmemDirs = getConf().getTrimmedStrings(
         DFS_DATANODE_CACHE_PMEM_DIRS_KEY);
 
@@ -342,10 +328,6 @@ public class DNConf {
     return maxLockedMemory;
   }
 
-  public long getMaxLockedPmem() {
-    return maxLockedPmem;
-  }
-
   /**
    * Returns true if connect to datanode via hostname
    * 
@@ -449,10 +431,6 @@ public class DNConf {
     return maxDataLength;
   }
 
-  public Class<? extends MappableBlockLoader> getCacheLoaderClass() {
-    return cacheLoaderClass;
-  }
-
   public String[] getPmemVolumes() {
     return pmemDirs;
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index fb67cc4..ae6192e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -183,9 +182,8 @@ public class FsDatasetCache {
     this.memCacheStats = new MemoryCacheStats(
         dataset.datanode.getDnConf().getMaxLockedMemory());
 
-    Class<? extends MappableBlockLoader> cacheLoaderClass =
-        dataset.datanode.getDnConf().getCacheLoaderClass();
-    this.cacheLoader = ReflectionUtils.newInstance(cacheLoaderClass, null);
+    this.cacheLoader = MappableBlockLoaderFactory.createCacheLoader(
+        this.getDnConf());
     cacheLoader.initialize(this);
   }
 
@@ -213,7 +211,7 @@ public class FsDatasetCache {
       return null;
     }
     ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
-    return cacheLoader.getCachedPath(key);
+    return PmemVolumeManager.getInstance().getCachePath(key);
   }
 
   /**
@@ -380,14 +378,13 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = cacheLoader.reserve(length);
+      long newUsedBytes = cacheLoader.reserve(key, length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
-          LOG.warn("Failed to cache " + key + ": could not reserve " + length +
-              " more bytes in the cache: " +
-              cacheLoader.getCacheCapacityConfigKey() +
-              " of " + cacheLoader.getCacheCapacity() + " exceeded.");
+          LOG.warn("Failed to cache " + key + ": could not reserve " +
+              "more bytes in the cache: " + cacheLoader.getCacheCapacity() +
+              " exceeded when try to reserve " + length + "bytes.");
           return;
         }
         reservedBytes = true;
@@ -442,10 +439,10 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            cacheLoader.release(length);
+            cacheLoader.release(key, length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
-                  + "bytes in total.", key, memCacheStats.getCacheUsed());
+                  + "bytes in total.", key, cacheLoader.getCacheUsed());
           IOUtils.closeQuietly(mappableBlock);
           numBlocksFailedToCache.incrementAndGet();
 
@@ -519,7 +516,8 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes = cacheLoader.release(value.mappableBlock.getLength());
+      long newUsedBytes = cacheLoader.
+          release(key, value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
@@ -592,4 +590,11 @@ public class FsDatasetCache {
   MappableBlockLoader getCacheLoader() {
     return cacheLoader;
   }
+
+  /**
+   * This method can be executed during DataNode shutdown.
+   */
+  void shutdown() {
+    cacheLoader.shutdown();
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 29c31ef..801b4c6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2340,6 +2340,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
                      "from LazyWriter.join");
       }
     }
+
+    cacheManager.shutdown();
   }
 
   @Override // FSDatasetMBean
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index a9e9610..044e5c5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -65,26 +65,25 @@ public abstract class MappableBlockLoader {
   /**
    * Try to reserve some given bytes.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to add.
    *
    * @return              The new number of usedBytes if we succeeded;
    *                      -1 if we failed.
    */
-  abstract long reserve(long bytesCount);
+  abstract long reserve(ExtendedBlockId key, long bytesCount);
 
   /**
    * Release some bytes that we're using.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to release.
    *
    * @return              The new number of usedBytes.
    */
-  abstract long release(long bytesCount);
-
-  /**
-   * Get the config key of cache capacity.
-   */
-  abstract String getCacheCapacityConfigKey();
+  abstract long release(ExtendedBlockId key, long bytesCount);
 
   /**
    * Get the approximate amount of cache space used.
@@ -102,9 +101,11 @@ public abstract class MappableBlockLoader {
   abstract boolean isTransientCache();
 
   /**
-   * Get a cache file path if applicable. Otherwise return null.
+   * Clean up cache, can be used during DataNode shutdown.
    */
-  abstract String getCachedPath(ExtendedBlockId key);
+  void shutdown() {
+    // Do nothing.
+  }
 
   /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
new file mode 100644
index 0000000..43b1b53
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+
+/**
+ * Creates MappableBlockLoader.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class MappableBlockLoaderFactory {
+
+  private MappableBlockLoaderFactory() {
+    // Prevent instantiation
+  }
+
+  /**
+   * Create a specific cache loader according to the configuration.
+   * If persistent memory volume is not configured, return a cache loader
+   * for DRAM cache. Otherwise, return a cache loader for pmem cache.
+   */
+  public static MappableBlockLoader createCacheLoader(DNConf conf) {
+    if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) {
+      return new MemoryMappableBlockLoader();
+    }
+    return new PmemMappableBlockLoader();
+  }
+}
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 4b7af19..919835a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -22,11 +22,12 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
@@ -42,11 +43,13 @@ import java.nio.channels.FileChannel;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
-
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MemoryMappableBlockLoader.class);
   private MemoryCacheStats memCacheStats;
 
   @Override
   void initialize(FsDatasetCache cacheManager) throws IOException {
+    LOG.info("Initializing cache loader: MemoryMappableBlockLoader.");
     this.memCacheStats = cacheManager.getMemCacheStats();
   }
 
@@ -149,11 +152,6 @@ public class MemoryMappableBlockLoader extends 
MappableBlockLoader {
   }
 
   @Override
-  public String getCacheCapacityConfigKey() {
-    return DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
-  }
-
-  @Override
   public long getCacheUsed() {
     return memCacheStats.getCacheUsed();
   }
@@ -164,12 +162,12 @@ public class MemoryMappableBlockLoader extends 
MappableBlockLoader {
   }
 
   @Override
-  long reserve(long bytesCount) {
+  long reserve(ExtendedBlockId key, long bytesCount) {
     return memCacheStats.reserve(bytesCount);
   }
 
   @Override
-  long release(long bytesCount) {
+  long release(ExtendedBlockId key, long bytesCount) {
     return memCacheStats.release(bytesCount);
   }
 
@@ -177,9 +175,4 @@ public class MemoryMappableBlockLoader extends 
MappableBlockLoader {
   public boolean isTransientCache() {
     return true;
   }
-
-  @Override
-  public String getCachedPath(ExtendedBlockId key) {
-    return null;
-  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
index c581d31..05a9ba7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java
@@ -18,12 +18,10 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
@@ -53,14 +51,10 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
 
   @Override
   void initialize(FsDatasetCache cacheManager) throws IOException {
+    LOG.info("Initializing cache loader: PmemMappableBlockLoader.");
     DNConf dnConf = cacheManager.getDnConf();
-    this.pmemVolumeManager = new PmemVolumeManager(dnConf.getMaxLockedPmem(),
-        dnConf.getPmemVolumes());
-  }
-
-  @VisibleForTesting
-  PmemVolumeManager getPmemVolumeManager() {
-    return pmemVolumeManager;
+    PmemVolumeManager.init(dnConf.getPmemVolumes());
+    pmemVolumeManager = PmemVolumeManager.getInstance();
   }
 
   /**
@@ -69,7 +63,7 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
    * Map the block and verify its checksum.
    *
    * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir
-   * is a persistent memory volume selected by getOneLocation() method.
+   * is a persistent memory volume chosen by PmemVolumeManager.
    *
    * @param length         The current length of the block.
    * @param blockIn        The block input stream. Should be positioned at the
@@ -100,8 +94,7 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
         throw new IOException("Block InputStream has no FileChannel.");
       }
 
-      Byte volumeIndex = pmemVolumeManager.getOneVolumeIndex();
-      filePath = pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+      filePath = pmemVolumeManager.getCachePath(key);
       file = new RandomAccessFile(filePath, "rw");
       out = file.getChannel().
           map(FileChannel.MapMode.READ_WRITE, 0, length);
@@ -111,9 +104,7 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
       }
       verifyChecksumAndMapBlock(out, length, metaIn, blockChannel,
           blockFileName);
-      mappableBlock = new PmemMappedBlock(
-          length, volumeIndex, key, pmemVolumeManager);
-      pmemVolumeManager.afterCache(key, volumeIndex);
+      mappableBlock = new PmemMappedBlock(length, key);
       LOG.info("Successfully cached one replica:{} into persistent memory"
           + ", [cached path={}, length={}]", key, filePath, length);
     } finally {
@@ -123,6 +114,7 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
       }
       IOUtils.closeQuietly(file);
       if (mappableBlock == null) {
+        LOG.debug("Delete {} due to unsuccessful mapping.", filePath);
         FsDatasetUtil.deleteMappedFile(filePath);
       }
     }
@@ -194,11 +186,6 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
   }
 
   @Override
-  public String getCacheCapacityConfigKey() {
-    return DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
-  }
-
-  @Override
   public long getCacheUsed() {
     return pmemVolumeManager.getCacheUsed();
   }
@@ -209,13 +196,13 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
   }
 
   @Override
-  long reserve(long bytesCount) {
-    return pmemVolumeManager.reserve(bytesCount);
+  long reserve(ExtendedBlockId key, long bytesCount) {
+    return pmemVolumeManager.reserve(key, bytesCount);
   }
 
   @Override
-  long release(long bytesCount) {
-    return pmemVolumeManager.release(bytesCount);
+  long release(ExtendedBlockId key, long bytesCount) {
+    return pmemVolumeManager.release(key, bytesCount);
   }
 
   @Override
@@ -224,7 +211,8 @@ public class PmemMappableBlockLoader extends 
MappableBlockLoader {
   }
 
   @Override
-  public String getCachedPath(ExtendedBlockId key) {
-    return pmemVolumeManager.getCacheFilePath(key);
+  void shutdown() {
+    LOG.info("Clean up cache on persistent memory during shutdown.");
+    PmemVolumeManager.getInstance().cleanup();
   }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
index ce4fa22..25c3d40 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java
@@ -35,18 +35,13 @@ import java.io.IOException;
 public class PmemMappedBlock implements MappableBlock {
   private static final Logger LOG =
       LoggerFactory.getLogger(PmemMappedBlock.class);
-  private final PmemVolumeManager pmemVolumeManager;
   private long length;
-  private Byte volumeIndex = null;
   private ExtendedBlockId key;
 
-  PmemMappedBlock(long length, Byte volumeIndex, ExtendedBlockId key,
-                  PmemVolumeManager pmemVolumeManager) {
+  PmemMappedBlock(long length, ExtendedBlockId key) {
     assert length > 0;
     this.length = length;
-    this.volumeIndex = volumeIndex;
     this.key = key;
-    this.pmemVolumeManager = pmemVolumeManager;
   }
 
   @Override
@@ -57,10 +52,9 @@ public class PmemMappedBlock implements MappableBlock {
   @Override
   public void close() {
     String cacheFilePath =
-        pmemVolumeManager.inferCacheFilePath(volumeIndex, key);
+        PmemVolumeManager.getInstance().getCachePath(key);
     try {
       FsDatasetUtil.deleteMappedFile(cacheFilePath);
-      pmemVolumeManager.afterUncache(key);
       LOG.info("Successfully uncached one replica:{} from persistent memory"
           + ", [cached path={}, length={}]", key, cacheFilePath, length);
     } catch (IOException e) {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
index 76aa2dd..2d77f7a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemVolumeManager.java
@@ -35,6 +35,7 @@ import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,14 +46,19 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class PmemVolumeManager {
+public final class PmemVolumeManager {
 
   /**
    * Counts used bytes for persistent memory.
    */
-  private class UsedBytesCount {
+  private static class UsedBytesCount {
+    private final long maxBytes;
     private final AtomicLong usedBytes = new AtomicLong(0);
 
+    UsedBytesCount(long maxBytes) {
+      this.maxBytes = maxBytes;
+    }
+
     /**
      * Try to reserve more bytes.
      *
@@ -65,7 +71,7 @@ public class PmemVolumeManager {
       while (true) {
         long cur = usedBytes.get();
         long next = cur + bytesCount;
-        if (next > cacheCapacity) {
+        if (next > maxBytes) {
           return -1;
         }
         if (usedBytes.compareAndSet(cur, next)) {
@@ -85,42 +91,76 @@ public class PmemVolumeManager {
       return usedBytes.addAndGet(-bytesCount);
     }
 
-    long get() {
+    long getUsedBytes() {
       return usedBytes.get();
     }
+
+    long getMaxBytes() {
+      return maxBytes;
+    }
+
+    long getAvailableBytes() {
+      return maxBytes - usedBytes.get();
+    }
   }
 
   private static final Logger LOG =
       LoggerFactory.getLogger(PmemVolumeManager.class);
+  public static final String CACHE_DIR = "hdfs_pmem_cache";
+  private static PmemVolumeManager pmemVolumeManager = null;
   private final ArrayList<String> pmemVolumes = new ArrayList<>();
   // Maintain which pmem volume a block is cached to.
   private final Map<ExtendedBlockId, Byte> blockKeyToVolume =
       new ConcurrentHashMap<>();
-  private final UsedBytesCount usedBytesCount;
+  private final List<UsedBytesCount> usedBytesCounts = new ArrayList<>();
 
   /**
    * The total cache capacity in bytes of persistent memory.
-   * It is 0L if the specific mappableBlockLoader couldn't cache data to pmem.
    */
-  private final long cacheCapacity;
+  private long cacheCapacity;
+  private static long maxBytesPerPmem = -1;
   private int count = 0;
-  // Strict atomic operation is not guaranteed for the performance sake.
-  private int i = 0;
+  private byte nextIndex = 0;
 
-  PmemVolumeManager(long maxBytes, String[] pmemVolumesConfigured)
-      throws IOException {
-    if (pmemVolumesConfigured == null || pmemVolumesConfigured.length == 0) {
+  private PmemVolumeManager(String[] pmemVolumesConfig) throws IOException {
+    if (pmemVolumesConfig == null || pmemVolumesConfig.length == 0) {
       throw new IOException("The persistent memory volume, " +
           DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY +
           " is not configured!");
     }
-    this.loadVolumes(pmemVolumesConfigured);
-    this.usedBytesCount = new UsedBytesCount();
-    this.cacheCapacity = maxBytes;
+    this.loadVolumes(pmemVolumesConfig);
+    cacheCapacity = 0L;
+    for (UsedBytesCount counter : usedBytesCounts) {
+      cacheCapacity += counter.getMaxBytes();
+    }
+  }
+
+  public synchronized static void init(String[] pmemVolumesConfig)
+      throws IOException {
+    if (pmemVolumeManager == null) {
+      pmemVolumeManager = new PmemVolumeManager(pmemVolumesConfig);
+    }
+  }
+
+  public static PmemVolumeManager getInstance() {
+    if (pmemVolumeManager == null) {
+      throw new RuntimeException(
+          "The pmemVolumeManager should be instantiated!");
+    }
+    return pmemVolumeManager;
+  }
+
+  @VisibleForTesting
+  public static void setMaxBytes(long maxBytes) {
+    maxBytesPerPmem = maxBytes;
   }
 
   public long getCacheUsed() {
-    return usedBytesCount.get();
+    long usedBytes = 0L;
+    for (UsedBytesCount counter : usedBytesCounts) {
+      usedBytes += counter.getUsedBytes();
+    }
+    return usedBytes;
   }
 
   public long getCacheCapacity() {
@@ -130,24 +170,40 @@ public class PmemVolumeManager {
   /**
    * Try to reserve more bytes on persistent memory.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to add.
    *
    * @return              The new number of usedBytes if we succeeded;
    *                      -1 if we failed.
    */
-  long reserve(long bytesCount) {
-    return usedBytesCount.reserve(bytesCount);
+  synchronized long reserve(ExtendedBlockId key, long bytesCount) {
+    try {
+      byte index = chooseVolume(bytesCount);
+      long usedBytes = usedBytesCounts.get(index).reserve(bytesCount);
+      // Put the entry into blockKeyToVolume if reserving bytes succeeded.
+      if (usedBytes > 0) {
+        blockKeyToVolume.put(key, index);
+      }
+      return usedBytes;
+    } catch (IOException e) {
+      LOG.warn(e.getMessage());
+      return -1L;
+    }
   }
 
   /**
    * Release some bytes that we're using on persistent memory.
    *
+   * @param key           The ExtendedBlockId for a block.
+   *
    * @param bytesCount    The number of bytes to release.
    *
    * @return              The new number of usedBytes.
    */
-  long release(long bytesCount) {
-    return usedBytesCount.release(bytesCount);
+  long release(ExtendedBlockId key, long bytesCount) {
+    Byte index = blockKeyToVolume.remove(key);
+    return usedBytesCounts.get(index).release(bytesCount);
   }
 
   /**
@@ -155,46 +211,70 @@ public class PmemVolumeManager {
    *
    * @throws IOException   If there is no available pmem volume.
    */
-  private void loadVolumes(String[] volumes) throws IOException {
+  private void loadVolumes(String[] volumes)
+      throws IOException {
     // Check whether the volume exists
-    for (String volume: volumes) {
+    for (byte n = 0; n < volumes.length; n++) {
       try {
-        File pmemDir = new File(volume);
-        verifyIfValidPmemVolume(pmemDir);
-        // Remove all files under the volume.
-        FileUtils.cleanDirectory(pmemDir);
+        File pmemDir = new File(volumes[n]);
+        File realPmemDir = verifyIfValidPmemVolume(pmemDir);
+        this.pmemVolumes.add(realPmemDir.getPath());
+        long maxBytes;
+        if (maxBytesPerPmem == -1) {
+          maxBytes = realPmemDir.getUsableSpace();
+        } else {
+          maxBytes = maxBytesPerPmem;
+        }
+        UsedBytesCount usedBytesCount = new UsedBytesCount(maxBytes);
+        this.usedBytesCounts.add(usedBytesCount);
+        LOG.info("Added persistent memory - {} with size={}",
+            volumes[n], maxBytes);
       } catch (IllegalArgumentException e) {
-        LOG.error("Failed to parse persistent memory volume " + volume, e);
+        LOG.error("Failed to parse persistent memory volume " + volumes[n], e);
         continue;
       } catch (IOException e) {
-        LOG.error("Bad persistent memory volume: " + volume, e);
+        LOG.error("Bad persistent memory volume: " + volumes[n], e);
         continue;
       }
-      pmemVolumes.add(volume);
-      LOG.info("Added persistent memory - " + volume);
     }
     count = pmemVolumes.size();
     if (count == 0) {
       throw new IOException(
           "At least one valid persistent memory volume is required!");
     }
+    cleanup();
+  }
+
+  void cleanup() {
+    // Remove all files under the volume.
+    for (String pmemDir: pmemVolumes) {
+      try {
+        FileUtils.cleanDirectory(new File(pmemDir));
+      } catch (IOException e) {
+        LOG.error("Failed to clean up " + pmemDir, e);
+      }
+    }
   }
 
   @VisibleForTesting
-  static void verifyIfValidPmemVolume(File pmemDir)
+  static File verifyIfValidPmemVolume(File pmemDir)
       throws IOException {
     if (!pmemDir.exists()) {
       final String message = pmemDir + " does not exist";
       throw new IOException(message);
     }
-
     if (!pmemDir.isDirectory()) {
       final String message = pmemDir + " is not a directory";
       throw new IllegalArgumentException(message);
     }
 
+    File realPmemDir = new File(getRealPmemDir(pmemDir.getPath()));
+    if (!realPmemDir.exists() && !realPmemDir.mkdir()) {
+      throw new IOException("Failed to create " + realPmemDir.getPath());
+    }
+
     String uuidStr = UUID.randomUUID().toString();
-    String testFilePath = pmemDir.getPath() + "/.verify.pmem." + uuidStr;
+    String testFilePath = realPmemDir.getPath() + "/.verify.pmem." + uuidStr;
     byte[] contents = uuidStr.getBytes("UTF-8");
     RandomAccessFile testFile = null;
     MappedByteBuffer out = null;
@@ -203,15 +283,17 @@ public class PmemVolumeManager {
       out = testFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0,
           contents.length);
       if (out == null) {
-        throw new IOException("Failed to map the test file under " + pmemDir);
+        throw new IOException(
+            "Failed to map the test file under " + realPmemDir);
       }
       out.put(contents);
       // Forces to write data to storage device containing the mapped file
       out.force();
+      return realPmemDir;
     } catch (IOException e) {
       throw new IOException(
           "Exception while writing data to persistent storage dir: " +
-              pmemDir, e);
+              realPmemDir, e);
     } finally {
       if (out != null) {
         out.clear();
@@ -229,18 +311,38 @@ public class PmemVolumeManager {
     }
   }
 
+  public static String getRealPmemDir(String rawPmemDir) {
+    return new File(rawPmemDir, CACHE_DIR).getAbsolutePath();
+  }
+
   /**
    * Choose a persistent memory volume based on a specific algorithm.
    * Currently it is a round-robin policy.
    *
    * TODO: Refine volume selection policy by considering storage utilization.
    */
-  Byte getOneVolumeIndex() throws IOException {
-    if (count != 0) {
-      return (byte)(i++ % count);
-    } else {
+  synchronized Byte chooseVolume(long bytesCount) throws IOException {
+    if (count == 0) {
       throw new IOException("No usable persistent memory is found");
     }
+    int k = 0;
+    long maxAvailableSpace = 0L;
+    while (k++ != count) {
+      if (nextIndex == count) {
+        nextIndex = 0;
+      }
+      byte index = nextIndex++;
+      long availableBytes = usedBytesCounts.get(index).getAvailableBytes();
+      if (availableBytes >= bytesCount) {
+        return index;
+      }
+      if (availableBytes > maxAvailableSpace) {
+        maxAvailableSpace = availableBytes;
+      }
+    }
+    throw new IOException("There is no enough persistent memory space " +
+        "for caching. The current max available space is " +
+        maxAvailableSpace + ", but " + bytesCount + "is required.");
   }
 
   @VisibleForTesting
@@ -276,7 +378,7 @@ public class PmemVolumeManager {
   /**
    * The cache file path is pmemVolume/BlockPoolId-BlockId.
    */
-  public String getCacheFilePath(ExtendedBlockId key) {
+  public String getCachePath(ExtendedBlockId key) {
     Byte volumeIndex = blockKeyToVolume.get(key);
     if (volumeIndex == null) {
       return  null;
@@ -288,19 +390,4 @@ public class PmemVolumeManager {
   Map<ExtendedBlockId, Byte> getBlockKeyToVolume() {
     return blockKeyToVolume;
   }
-
-  /**
-   * Add cached block's ExtendedBlockId and its cache volume index to a map
-   * after cache.
-   */
-  public void afterCache(ExtendedBlockId key, Byte volumeIndex) {
-    blockKeyToVolume.put(key, volumeIndex);
-  }
-
-  /**
-   * Remove the record in blockKeyToVolume for uncached block after uncache.
-   */
-  public void afterUncache(ExtendedBlockId key) {
-    blockKeyToVolume.remove(key);
-  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3fcb387..12bacc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2511,18 +2511,6 @@
 </property>
 
 <property>
-  <name>dfs.datanode.cache.loader.class</name>
-  
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryMappableBlockLoader</value>
-  <description>
-    Currently, the available cache loaders are MemoryMappableBlockLoader, 
PmemMappableBlockLoader.
-    By default, MemoryMappableBlockLoader is used and it maps block replica 
into memory.
-    PmemMappableBlockLoader can map block to persistent memory with mapped 
byte buffer, which is
-    implemented by Java code. The value of dfs.datanode.cache.pmem.dirs 
specifies the persistent
-    memory directory.
-  </description>
-</property>
-
-<property>
   <name>dfs.datanode.max.locked.memory</name>
   <value>0</value>
   <description>
@@ -2539,18 +2527,6 @@
 </property>
 
 <property>
-  <name>dfs.datanode.cache.pmem.capacity</name>
-  <value>0</value>
-  <description>
-    The amount of persistent memory in bytes that can be used to cache block
-    replicas to persistent memory. Currently, persistent memory is only enabled
-    in HDFS Centralized Cache Management feature.
-
-    By default, this parameter is 0, which disables persistent memory caching.
-  </description>
-</property>
-
-<property>
   <name>dfs.datanode.cache.pmem.dirs</name>
   <value></value>
   <description>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
index 9b4f06f..58812db 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestCacheByPmemMappableBlockLoader.java
@@ -21,8 +21,6 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_LOADER_CLASS;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_CACHE_PMEM_DIRS_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -139,14 +137,11 @@ public class TestCacheByPmemMappableBlockLoader {
     conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
 
     // Configuration for pmem cache
-    conf.set(DFS_DATANODE_CACHE_LOADER_CLASS,
-        "org.apache.hadoop.hdfs.server.datanode." +
-            "fsdataset.impl.PmemMappableBlockLoader");
     new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
     new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
     // Configure two bogus pmem volumes
     conf.set(DFS_DATANODE_CACHE_PMEM_DIRS_KEY, PMEM_DIR_0 + "," + PMEM_DIR_1);
-    conf.setLong(DFS_DATANODE_CACHE_PMEM_CAPACITY_KEY, CACHE_CAPACITY);
+    PmemVolumeManager.setMaxBytes((long) (CACHE_CAPACITY * 0.5));
 
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
@@ -183,18 +178,17 @@ public class TestCacheByPmemMappableBlockLoader {
 
   @Test
   public void testPmemVolumeManager() throws IOException {
-    PmemVolumeManager pmemVolumeManager =
-        cacheLoader.getPmemVolumeManager();
+    PmemVolumeManager pmemVolumeManager = PmemVolumeManager.getInstance();
     assertNotNull(pmemVolumeManager);
     assertEquals(CACHE_CAPACITY, pmemVolumeManager.getCacheCapacity());
     // Test round-robin selection policy
     long count1 = 0, count2 = 0;
     for (int i = 0; i < 10; i++) {
-      Byte index = pmemVolumeManager.getOneVolumeIndex();
+      Byte index = pmemVolumeManager.chooseVolume(BLOCK_SIZE);
       String volume = pmemVolumeManager.getVolumeByIndex(index);
-      if (volume.equals(PMEM_DIR_0)) {
+      if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_0))) {
         count1++;
-      } else if (volume.equals(PMEM_DIR_1)) {
+      } else if (volume.equals(PmemVolumeManager.getRealPmemDir(PMEM_DIR_1))) {
         count2++;
       } else {
         fail("Unexpected persistent storage location:" + volume);
@@ -254,7 +248,7 @@ public class TestCacheByPmemMappableBlockLoader {
     // The pmem cache space is expected to have been used up.
     assertEquals(CACHE_CAPACITY, cacheManager.getPmemCacheUsed());
     Map<ExtendedBlockId, Byte> blockKeyToVolume =
-        cacheLoader.getPmemVolumeManager().getBlockKeyToVolume();
+        PmemVolumeManager.getInstance().getBlockKeyToVolume();
     // All block keys should be kept in blockKeyToVolume
     assertEquals(blockKeyToVolume.size(), maxCacheBlocksNum);
     assertTrue(blockKeyToVolume.keySet().containsAll(blockKeys));
@@ -266,11 +260,13 @@ public class TestCacheByPmemMappableBlockLoader {
       // to pmem.
       assertNotNull(cachePath);
       String expectFileName =
-          cacheLoader.getPmemVolumeManager().getCacheFileName(key);
+          PmemVolumeManager.getInstance().getCacheFileName(key);
       if (cachePath.startsWith(PMEM_DIR_0)) {
-        assertTrue(cachePath.equals(PMEM_DIR_0 + "/" + expectFileName));
+        assertTrue(cachePath.equals(PmemVolumeManager
+            .getRealPmemDir(PMEM_DIR_0) + "/" + expectFileName));
       } else if (cachePath.startsWith(PMEM_DIR_1)) {
-        assertTrue(cachePath.equals(PMEM_DIR_1 + "/" + expectFileName));
+        assertTrue(cachePath.equals(PmemVolumeManager
+            .getRealPmemDir(PMEM_DIR_1) + "/" + expectFileName));
       } else {
         fail("The cache path is not the expected one: " + cachePath);
       }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
index 9060584..ab3cfd9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
@@ -401,9 +401,10 @@ public class TestFsDatasetCache {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
+        // check the log reported by FsDataSetCache
+        // in the case that cache capacity is exceeded.
         int lines = appender.countLinesWithMessage(
-            "more bytes in the cache: " +
-            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+            "could not reserve more bytes in the cache: ");
         return lines > 0;
       }
     }, 500, 30000);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to