This is an automated email from the ASF dual-hosted git repository.

zhangshuyan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ef2322b6d7c HDFS-17243. Add the parameter storage type for getBlocks 
method (#6238). Contributed by Haiyang Hu.
4ef2322b6d7c is described below

commit 4ef2322b6d7c1d1ae1cd9e62042f2db0ae42fc7c
Author: huhaiyang <huhaiyang...@126.com>
AuthorDate: Mon Nov 6 11:20:25 2023 +0800

    HDFS-17243. Add the parameter storage type for getBlocks method (#6238). 
Contributed by Haiyang Hu.
    
    Reviewed-by: He Xiaoqiao <hexiaoq...@apache.org>
    Reviewed-by: Tao Li <toms...@apache.org>
    Signed-off-by: Shuyan Zhang <zhangshu...@apache.org>
---
 .../federation/router/RouterNamenodeProtocol.java  |   7 +-
 .../server/federation/router/RouterRpcServer.java  |   4 +-
 .../server/federation/router/TestRouterRpc.java    |   6 +-
 .../NamenodeProtocolServerSideTranslatorPB.java    |   4 +-
 .../protocolPB/NamenodeProtocolTranslatorPB.java   |  11 ++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java    |   2 +-
 .../hdfs/server/balancer/NameNodeConnector.java    |   5 +-
 .../hdfs/server/blockmanagement/BlockManager.java  |   7 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |   9 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |   4 +-
 .../hdfs/server/protocol/NamenodeProtocol.java     |   4 +-
 .../src/main/proto/NamenodeProtocol.proto          |   1 +
 .../java/org/apache/hadoop/hdfs/TestGetBlocks.java | 110 ++++++++++++++++++---
 .../hadoop/hdfs/server/balancer/TestBalancer.java  |   2 +-
 .../balancer/TestBalancerWithHANameNodes.java      |   2 +-
 15 files changed, 142 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
index 278d282fd7e6..a5a047d115cd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -53,7 +54,7 @@ public class RouterNamenodeProtocol implements 
NamenodeProtocol {
 
   @Override
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
-      long minBlockSize, long hotBlockTimeInterval) throws IOException {
+      long minBlockSize, long hotBlockTimeInterval, StorageType storageType) 
throws IOException {
     rpcServer.checkOperation(OperationCategory.READ);
 
     // Get the namespace where the datanode is located
@@ -79,8 +80,8 @@ public class RouterNamenodeProtocol implements 
NamenodeProtocol {
     if (nsId != null) {
       RemoteMethod method = new RemoteMethod(
           NamenodeProtocol.class, "getBlocks", new Class<?>[]
-          {DatanodeInfo.class, long.class, long.class, long.class},
-          datanode, size, minBlockSize, hotBlockTimeInterval);
+          {DatanodeInfo.class, long.class, long.class, long.class, 
StorageType.class},
+          datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
       return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
     }
     return null;
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index cae61b7d927d..2aa2eae5305d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1612,9 +1612,9 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol,
 
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
-      long minBlockSize, long hotBlockTimeInterval) throws IOException {
+      long minBlockSize, long hotBlockTimeInterval, StorageType storageType) 
throws IOException {
     return nnProto.getBlocks(datanode, size, minBlockSize,
-            hotBlockTimeInterval);
+            hotBlockTimeInterval, storageType);
   }
 
   @Override // NamenodeProtocol
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index d44b40b05238..93e905b4eaff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -1385,9 +1385,11 @@ public class TestRouterRpc {
 
     // Verify that checking that datanode works
     BlocksWithLocations routerBlockLocations =
-        routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
+        routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
+            null);
     BlocksWithLocations nnBlockLocations =
-        nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
+        nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
+            null);
     BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
     BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
     assertEquals(nnBlocks.length, routerBlocks.length);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index e89a6b62b507..f4025366391c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -89,7 +89,9 @@ public class NamenodeProtocolServerSideTranslatorPB implements
     BlocksWithLocations blocks;
     try {
       blocks = impl.getBlocks(dnInfo, request.getSize(),
-          request.getMinBlockSize(), request.getTimeInterval());
+          request.getMinBlockSize(), request.getTimeInterval(),
+          request.hasStorageType() ?
+              PBHelperClient.convertStorageType(request.getStorageType()): 
null);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index fd40e0ecef34..87518aa1e231 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import 
org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
@@ -101,11 +102,15 @@ public class NamenodeProtocolTranslatorPB implements 
NamenodeProtocol,
 
   @Override
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize, long timeInterval)
+      minBlockSize, long timeInterval, StorageType storageType)
       throws IOException {
-    GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
+    GetBlocksRequestProto.Builder builder = GetBlocksRequestProto.newBuilder()
         
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
-        .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
+        .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval);
+    if (storageType != null) {
+      builder.setStorageType(PBHelperClient.convertStorageType(storageType));
+    }
+    GetBlocksRequestProto req = builder.build();
     return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req)
         .getBlocks()));
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 98a6d8449b62..6ad0e4d22a85 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -839,7 +839,7 @@ public class Dispatcher {
       final long size = Math.min(getBlocksSize, blocksToReceive);
       final BlocksWithLocations newBlksLocs =
           nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
-              hotBlockTimeInterval);
+              hotBlockTimeInterval, storageType);
 
       if (LOG.isTraceEnabled()) {
         LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 34be025203d4..e8274fdbe779 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.Preconditions;
 import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -255,7 +256,7 @@ public class NameNodeConnector implements Closeable {
 
   /** @return blocks with locations. */
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize, long timeInterval) throws IOException {
+      minBlockSize, long timeInterval, StorageType storageType) throws 
IOException {
     if (getBlocksRateLimiter != null) {
       getBlocksRateLimiter.acquire();
     }
@@ -274,7 +275,7 @@ public class NameNodeConnector implements Closeable {
       } else {
         nnProxy = namenode;
       }
-      return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval);
+      return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval, 
storageType);
     } finally {
       if (isRequestStandby) {
         LOG.info("Request #getBlocks to Standby NameNode success. " +
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 2351bb478287..2d216be94577 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1720,8 +1720,8 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Get all blocks with location information from a datanode. */
   public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
-      final long size, final long minBlockSize, final long timeInterval) throws
-      UnregisteredNodeException {
+      final long size, final long minBlockSize, final long timeInterval,
+      final StorageType storageType) throws UnregisteredNodeException {
     final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
     if (node == null) {
       blockLog.warn("BLOCK* getBlocks: Asking for blocks from an" +
@@ -1735,10 +1735,11 @@ public class BlockManager implements BlockStatsMXBean {
       return new BlocksWithLocations(new BlockWithLocations[0]);
     }
 
-    // skip stale storage
+    // skip stale storage, then choose specific storage type.
     DatanodeStorageInfo[] storageInfos = Arrays
         .stream(node.getStorageInfos())
         .filter(s -> !s.areBlockContentsStale())
+        .filter(s -> storageType == null || 
s.getStorageType().equals(storageType))
         .toArray(DatanodeStorageInfo[]::new);
 
     // starting from a random block
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 3d360c6d0dd2..7918daf6b9db 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1946,10 +1946,13 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
    *
    * @param datanode on which blocks are located
    * @param size total size of blocks
-   * @param minimumBlockSize
+   * @param minimumBlockSize each block should be of this minimum Block Size
+   * @param timeInterval prefer to get blocks which are belong to
+   *                     the cold files accessed before the time interval
+   * @param storageType the given storage type {@link StorageType}
    */
   public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
-      minimumBlockSize, long timeInterval) throws IOException {
+      minimumBlockSize, long timeInterval, StorageType storageType) throws 
IOException {
     OperationCategory checkOp =
         isGetBlocksCheckOperationEnabled ? OperationCategory.READ :
             OperationCategory.UNCHECKED;
@@ -1958,7 +1961,7 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     try {
       checkOperation(checkOp);
       return getBlockManager().getBlocksWithLocations(datanode, size,
-          minimumBlockSize, timeInterval);
+          minimumBlockSize, timeInterval, storageType);
     } finally {
       readUnlock("getBlocks");
     }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 4a041dbec275..f02688d1629f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -649,7 +649,7 @@ public class NameNodeRpcServer implements NamenodeProtocols 
{
   /////////////////////////////////////////////////////
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize, long timeInterval)
+      minBlockSize, long timeInterval, StorageType storageType)
       throws IOException {
     String operationName = "getBlocks";
     if(size <= 0) {
@@ -663,7 +663,7 @@ public class NameNodeRpcServer implements NamenodeProtocols 
{
     checkNNStartup();
     namesystem.checkSuperuserPrivilege(operationName);
     namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
-    return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
+    return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval, 
storageType);
   }
 
   @Override // NamenodeProtocol
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 44ffb85f79ec..03ddc5ef8b1e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -76,6 +77,7 @@ public interface NamenodeProtocol {
    * @param minBlockSize each block should be of this minimum Block Size
    * @param hotBlockTimeInterval prefer to get blocks which are belong to
    * the cold files accessed before the time interval
+   * @param storageType the given storage type {@link StorageType}
    * @return BlocksWithLocations a list of blocks &amp; their locations
    * @throws IOException if size is less than or equal to 0 or
   datanode does not exist
@@ -83,7 +85,7 @@ public interface NamenodeProtocol {
   @Idempotent
   @ReadOnly
   BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize, long hotBlockTimeInterval) throws IOException;
+      minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws 
IOException;
 
   /**
    * Get the current block keys
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index 32cdade055ee..29a9aa01b68d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -48,6 +48,7 @@ message GetBlocksRequestProto {
   // For more info refer HDFS-13356
   optional uint64 minBlockSize = 3 [default = 10485760];
   optional uint64 timeInterval = 4 [default = 0];
+  optional StorageTypeProto storageType = 5;
 }
 
  
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
index cad7d0fb5026..5abb8adc14e8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.*;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.SafeModeAction;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -239,26 +241,29 @@ public class TestGetBlocks {
           DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
 
       // Should return all 13 blocks, as minBlockSize is not passed
-      locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0,
+          null).getBlocks();
       assertEquals(blkLocsSize, locs.length);
 
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
       assertEquals(locs[1].getStorageIDs().length, replicationFactor);
 
       // Should return 12 blocks, as minBlockSize is blkSize
-      locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0,
+          null).getBlocks();
       assertEquals(blkLocsSize - 1, locs.length);
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
       assertEquals(locs[1].getStorageIDs().length, replicationFactor);
 
       // get blocks of size BlockSize from dataNodes[0]
       locs = namenode.getBlocks(dataNodes[0], blkSize,
-          blkSize, 0).getBlocks();
+          blkSize, 0, null).getBlocks();
       assertEquals(locs.length, 1);
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
 
       // get blocks of size 1 from dataNodes[0]
-      locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], 1, 1, 0,
+          null).getBlocks();
       assertEquals(locs.length, 1);
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
 
@@ -283,7 +288,8 @@ public class TestGetBlocks {
 
       // Namenode should refuse to provide block locations to the balancer
       // while in safemode.
-      locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0,
+          null).getBlocks();
       assertEquals(blkLocsSize, locs.length);
       assertFalse(fs.isInSafeMode());
       LOG.info("Entering safe mode");
@@ -310,7 +316,8 @@ public class TestGetBlocks {
 
     // Namenode should refuse should fail
     LambdaTestUtils.intercept(exClass,
-        msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0));
+        msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0,
+            null));
   }
 
   /**
@@ -450,7 +457,7 @@ public class TestGetBlocks {
           .getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks();
 
       BlockWithLocations[] locsAll = namenode.getBlocks(
-          dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
+          dataNodes[0], fileLen*2, 0, hotInterval, null).getBlocks();
       assertEquals(locsAll.length, 4);
 
       for(int i = 0; i < blockNum; i++) {
@@ -461,7 +468,7 @@ public class TestGetBlocks {
       }
 
       BlockWithLocations[]  locs2 = namenode.getBlocks(
-          dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
+          dataNodes[0], fileLen*2, 0, hotInterval, null).getBlocks();
       for(int i = 0; i < 2; i++) {
         assertTrue(belongToFile(locs2[i], locatedBlocksOld));
       }
@@ -508,7 +515,7 @@ public class TestGetBlocks {
 
     // check blocks count equals to blockNum
     BlockWithLocations[] blocks = namenode.getBlocks(
-        dataNodes[0], fileLen*2, 0, 0).getBlocks();
+        dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
     assertEquals(blockNum, blocks.length);
 
     // calculate the block count on storage[0]
@@ -524,13 +531,94 @@ public class TestGetBlocks {
     // set storage[0] stale
     storageInfos[0].setBlockContentsStale(true);
     blocks = namenode.getBlocks(
-        dataNodes[0], fileLen*2, 0, 0).getBlocks();
+        dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
     assertEquals(blockNum - count, blocks.length);
 
     // set all storage stale
     bm0.getDatanodeManager().markAllDatanodesStale();
     blocks = namenode.getBlocks(
-        dataNodes[0], fileLen*2, 0, 0).getBlocks();
+        dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
     assertEquals(0, blocks.length);
   }
+
+  @Test
+  public void testChooseSpecifyStorageType() throws Exception {
+    final short repFactor = (short) 1;
+    final int fileLen = BLOCK_SIZE;
+    final Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+
+    try (MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .storageTypes(new StorageType[] {StorageType.DISK, StorageType.SSD}).
+        storagesPerDatanode(2).build()) {
+      cluster.waitActive();
+
+      // Get storage info.
+      ClientProtocol client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+      DatanodeInfo[] dataNodes = 
client.getDatanodeReport(DatanodeReportType.ALL);
+      BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
+      DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager()
+          .getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos();
+      assert Arrays.stream(storageInfos)
+          .anyMatch(datanodeStorageInfo -> {
+            String storageTypeName = 
datanodeStorageInfo.getStorageType().name();
+            return storageTypeName.equals("SSD") || 
storageTypeName.equals("DISK");
+          }) : "No 'SSD' or 'DISK' storage types found.";
+
+      // Create hdfs file.
+      Path ssdDir = new Path("/testChooseSSD");
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path ssdFile = new Path(ssdDir, "file");
+      fs.mkdirs(ssdDir);
+      fs.setStoragePolicy(ssdDir, "ALL_SSD");
+      DFSTestUtil.createFile(fs, ssdFile, false, 1024, fileLen,
+          BLOCK_SIZE, repFactor, 0, true);
+      DFSTestUtil.waitReplication(fs, ssdFile, repFactor);
+      BlockLocation[] locations = fs.getClient()
+          .getBlockLocations(ssdFile.toUri().getPath(), 0, Long.MAX_VALUE);
+      assertEquals(1, locations.length);
+      assertEquals("SSD", locations[0].getStorageTypes()[0].name());
+
+      Path diskDir = new Path("/testChooseDisk");
+      fs = cluster.getFileSystem();
+      Path diskFile = new Path(diskDir, "file");
+      fs.mkdirs(diskDir);
+      fs.setStoragePolicy(diskDir, "HOT");
+      DFSTestUtil.createFile(fs, diskFile, false, 1024, fileLen,
+          BLOCK_SIZE, repFactor, 0, true);
+      DFSTestUtil.waitReplication(fs, diskFile, repFactor);
+      locations = fs.getClient()
+          .getBlockLocations(diskFile.toUri().getPath(), 0, Long.MAX_VALUE);
+      assertEquals(1, locations.length);
+      assertEquals("DISK", locations[0].getStorageTypes()[0].name());
+
+      InetSocketAddress addr = new InetSocketAddress("localhost",
+          cluster.getNameNodePort());
+      NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
+          DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
+
+      // Check blocks count equals to blockNum.
+      // If StorageType is not specified will get all blocks.
+      BlockWithLocations[] blocks = namenode.getBlocks(
+          dataNodes[0], fileLen * 2, 0, 0,
+          null).getBlocks();
+      assertEquals(2, blocks.length);
+
+      // Check the count of blocks with a StorageType of DISK.
+      blocks = namenode.getBlocks(
+          dataNodes[0], fileLen * 2, 0, 0,
+          StorageType.DISK).getBlocks();
+      assertEquals(1, blocks.length);
+      assertEquals("DISK", blocks[0].getStorageTypes()[0].name());
+
+      // Check the count of blocks with a StorageType of SSD.
+      blocks = namenode.getBlocks(
+          dataNodes[0], fileLen * 2, 0, 0,
+          StorageType.SSD).getBlocks();
+      assertEquals(1, blocks.length);
+      assertEquals("SSD", blocks[0].getStorageTypes()[0].name());
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 2a56c25d0d46..23d1cb441bb8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1891,7 +1891,7 @@ public class TestBalancer {
         numGetBlocksCalls.incrementAndGet();
         return blk;
       }}).when(fsnSpy).getBlocks(any(DatanodeID.class),
-        anyLong(), anyLong(), anyLong());
+        anyLong(), anyLong(), anyLong(), any(StorageType.class));
   }
 
   /**
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index d69051c8d7af..dbd76ee61451 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -277,7 +277,7 @@ public class TestBalancerWithHANameNodes {
         int expectedObserverIdx = withObserverFailure ? 3 : 2;
         int expectedCount = (i == expectedObserverIdx) ? 2 : 0;
         verify(namesystemSpies.get(i), times(expectedCount))
-            .getBlocks(any(), anyLong(), anyLong(), anyLong());
+            .getBlocks(any(), anyLong(), anyLong(), anyLong(), any());
       }
     } finally {
       if (qjmhaCluster != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to