Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10285 a27516211 -> 2666d51fb


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index b84b1d2..3681cae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -36,8 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -180,11 +178,10 @@ public class TestStoragePolicySatisfyWorker {
           lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
           lb.getStorageTypes()[0], StorageType.ARCHIVE);
       blockMovingInfos.add(blockMovingInfo);
-      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-      worker.processBlockMovingTasks(inode.getId(),
-          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
+          blockMovingInfos);
 
-      waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+      waitForBlockMovementCompletion(worker, 1, 30000);
     } finally {
       worker.stop();
     }
@@ -226,50 +223,42 @@ public class TestStoragePolicySatisfyWorker {
                 locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
         blockMovingInfos.add(blockMovingInfo);
       }
-      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-      worker.processBlockMovingTasks(inode.getId(),
-          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+      worker.processBlockMovingTasks(cluster.getNamesystem().getBlockPoolId(),
+          blockMovingInfos);
       // Wait till results queue build up
-      waitForBlockMovementResult(worker, inode.getId(), 30000);
+      waitForBlockMovementResult(worker, 30000);
       worker.dropSPSWork();
       assertTrue(worker.getBlocksMovementsStatusHandler()
-          .getBlksMovementResults().size() == 0);
+          .getMoveAttemptFinishedBlocks().size() == 0);
     } finally {
       worker.stop();
     }
   }
 
   private void waitForBlockMovementResult(
-      final StoragePolicySatisfyWorker worker, final long inodeId, int timeout)
-          throws Exception {
+      final StoragePolicySatisfyWorker worker, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<BlocksStorageMovementResult> completedBlocks = worker
-            .getBlocksMovementsStatusHandler().getBlksMovementResults();
+        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
+            .getMoveAttemptFinishedBlocks();
         return completedBlocks.size() > 0;
       }
     }, 100, timeout);
   }
 
   private void waitForBlockMovementCompletion(
-      final StoragePolicySatisfyWorker worker, final long inodeId,
-      int expectedFailedItemsCount, int timeout) throws Exception {
+      final StoragePolicySatisfyWorker worker,
+      int expectedFinishedItemsCount, int timeout) throws Exception {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<BlocksStorageMovementResult> completedBlocks = worker
-            .getBlocksMovementsStatusHandler().getBlksMovementResults();
-        int failedCount = 0;
-        for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
-          if (blkMovementResult.getStatus() ==
-              BlocksStorageMovementResult.Status.FAILURE) {
-            failedCount++;
-          }
-        }
+        List<Block> completedBlocks = worker.getBlocksMovementsStatusHandler()
+            .getMoveAttemptFinishedBlocks();
+        int finishedCount = completedBlocks.size();
         LOG.info("Block movement completed count={}, expected={} and 
actual={}",
-            completedBlocks.size(), expectedFailedItemsCount, failedCount);
-        return expectedFailedItemsCount == failedCount;
+            completedBlocks.size(), expectedFinishedItemsCount, finishedCount);
+        return expectedFinishedItemsCount == finishedCount;
       }
     }, 100, timeout);
   }
@@ -304,8 +293,7 @@ public class TestStoragePolicySatisfyWorker {
   private BlockMovingInfo prepareBlockMovingInfo(Block block,
       DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
       StorageType targetStorageType) {
-    return new BlockMovingInfo(block, new DatanodeInfo[] {src},
-        new DatanodeInfo[] {destin}, new StorageType[] {storageType},
-        new StorageType[] {targetStorageType});
+    return new BlockMovingInfo(block, src, destin, storageType,
+        targetStorageType);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index df120ca..20402f2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -112,7 +112,7 @@ public class TestStorageReport {
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
         Mockito.any(SlowDiskReports.class),
-        Mockito.any(BlocksStorageMovementResult[].class));
+        Mockito.any(BlocksStorageMoveAttemptFinished.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 1e016f7..ec00ae7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -958,7 +958,7 @@ public class NNThroughputBenchmark implements Tool {
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
           SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMovementResult[0]).getCommands();
+          new BlocksStorageMoveAttemptFinished(null)).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1009,7 +1009,7 @@ public class NNThroughputBenchmark implements Tool {
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
           SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-          new BlocksStorageMovementResult[0]).getCommands();
+          new BlocksStorageMoveAttemptFinished(null)).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index ba29c82..b2b878d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -40,7 +40,7 @@ import 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -128,7 +128,7 @@ public class NameNodeAdapter {
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
         SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-        new BlocksStorageMovementResult[0]);
+        new BlocksStorageMoveAttemptFinished(null));
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 7918821..f79326f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -18,10 +18,17 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.util.Time.monotonicNow;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import 
org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
 
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,9 +49,8 @@ public class TestBlockStorageMovementAttemptedItems {
     unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
         Mockito.mock(Namesystem.class),
         Mockito.mock(StoragePolicySatisfier.class), 100);
-    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
-        selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
+        selfRetryTimeout, unsatisfiedStorageMovementFiles);
   }
 
   @After
@@ -76,120 +82,115 @@ public class TestBlockStorageMovementAttemptedItems {
     return isItemFound;
   }
 
+  /**
+   * Verify that moved blocks reporting should queued up the block info.
+   */
   @Test(timeout = 30000)
-  public void testAddResultWithFailureResult() throws Exception {
-    bsmAttemptedItems.start(); // start block movement result monitor thread
-    Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
-    assertTrue(checkItemMovedForRetry(item, 200));
-  }
-
-  @Test(timeout = 30000)
-  public void testAddResultWithSucessResult() throws Exception {
+  public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
     bsmAttemptedItems.start(); // start block movement result monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
-    assertFalse(checkItemMovedForRetry(item, 200));
+    List<Block> blocks = new ArrayList<Block>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
+    Block[] blockArray = new Block[blocks.size()];
+    blocks.toArray(blockArray);
+    bsmAttemptedItems.addReportedMovedBlocks(blockArray);
+    assertEquals("Failed to receive result!", 1,
+        bsmAttemptedItems.getMovementFinishedBlocksCount());
   }
 
+  /**
+   * Verify empty moved blocks reporting queue.
+   */
   @Test(timeout = 30000)
-  public void testNoResultAdded() throws Exception {
-    bsmAttemptedItems.start(); // start block movement result monitor thread
+  public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception 
{
+    bsmAttemptedItems.start(); // start block movement report monitor thread
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), true);
-    // After self retry timeout, it should be added back for retry
-    assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 600));
-    assertEquals("Failed to remove from the attempted list", 0,
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks));
+    assertEquals("Shouldn't receive result", 0,
+        bsmAttemptedItems.getMovementFinishedBlocksCount());
+    assertEquals("Item doesn't exist in the attempted list", 1,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 
   /**
-   * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
-   * first occurrence is #blockStorageMovementResultCheck() and then
+   * Partial block movement with
+   * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first 
occurrence
+   * is #blockStorageMovementReportedItemsCheck() and then
    * #blocksStorageMovementUnReportedItemsCheck().
    */
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried1() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
-
-    // start block movement result monitor thread
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    blocks.add(new Block(5678L));
+    Long trackID = 0L;
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+
+    // start block movement report monitor thread
     bsmAttemptedItems.start();
     assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 5000));
+        checkItemMovedForRetry(trackID, 5000));
     assertEquals("Failed to remove from the attempted list", 0,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 
   /**
-   * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
-   * first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then
-   * #blockStorageMovementResultCheck().
+   * Partial block movement. Here, first occurrence is
+   * #blocksStorageMovementUnReportedItemsCheck() and then
+   * #blockStorageMovementReportedItemsCheck().
    */
   @Test(timeout = 30000)
   public void testPartialBlockMovementShouldBeRetried2() throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+    Long trackID = 0L;
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
 
     Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
 
     bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
-    bsmAttemptedItems.blockStorageMovementResultCheck();
+    bsmAttemptedItems.blockStorageMovementReportedItemsCheck();
 
     assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 5000));
+        checkItemMovedForRetry(trackID, 5000));
     assertEquals("Failed to remove from the attempted list", 0,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 
   /**
-   * Partial block movement with only BlocksStorageMovementResult#FAILURE
-   * result and storageMovementAttemptedItems list is empty.
+   * Partial block movement with only BlocksStorageMoveAttemptFinished report
+   * and storageMovementAttemptedItems list is empty.
    */
   @Test(timeout = 30000)
   public void testPartialBlockMovementWithEmptyAttemptedQueue()
       throws Exception {
     Long item = new Long(1234);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item, BlocksStorageMovementResult.Status.FAILURE)});
-    bsmAttemptedItems.blockStorageMovementResultCheck();
+    Long trackID = 0L;
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
     assertFalse(
         "Should not add in queue again if it is not there in"
             + " storageMovementAttemptedItems",
-        checkItemMovedForRetry(item, 5000));
-    assertEquals("Failed to remove from the attempted list", 0,
-        bsmAttemptedItems.getAttemptedItemsCount());
-  }
-
-  /**
-   * Partial block movement with BlocksStorageMovementResult#FAILURE result and
-   * storageMovementAttemptedItems.
-   */
-  @Test(timeout = 30000)
-  public void testPartialBlockMovementShouldBeRetried4() throws Exception {
-    Long item = new Long(1234);
-    bsmAttemptedItems.add(new ItemInfo(0L, item), false);
-    bsmAttemptedItems.addResults(
-        new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
-            item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
-    bsmAttemptedItems.blockStorageMovementResultCheck();
-    assertTrue("Failed to add to the retry list",
-        checkItemMovedForRetry(item, 5000));
-    assertEquals("Failed to remove from the attempted list", 0,
+        checkItemMovedForRetry(trackID, 5000));
+    assertEquals("Failed to remove from the attempted list", 1,
         bsmAttemptedItems.getAttemptedItemsCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 87b8e79..2a8777d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -44,7 +44,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -141,7 +141,7 @@ public class TestDeadDatanode {
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
             SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
-            new BlocksStorageMovementResult[0]).getCommands();
+            new BlocksStorageMoveAttemptFinished(null)).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 57e9f94..70219f6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -203,11 +203,11 @@ public class TestStoragePolicySatisfier {
   }
 
   /**
-   * Tests to verify that the block storage movement results will be propagated
+   * Tests to verify that the block storage movement report will be propagated
    * to Namenode via datanode heartbeat.
    */
   @Test(timeout = 300000)
-  public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
+  public void testBlksStorageMovementAttemptFinishedReport() throws Exception {
     try {
       createCluster();
       // Change policy to ONE_SSD
@@ -229,7 +229,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.DISK, 2, 30000, dfs);
 
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
       shutdownCluster();
     }
@@ -276,7 +276,7 @@ public class TestStoragePolicySatisfier {
             fileName, StorageType.DISK, 2, 30000, dfs);
       }
 
-      waitForBlocksMovementResult(files.size(), 30000);
+      waitForBlocksMovementAttemptReport(files.size(), 30000);
     } finally {
       shutdownCluster();
     }
@@ -457,7 +457,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.DISK, 2, 30000, dfs);
 
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
       shutdownCluster();
     }
@@ -630,7 +630,7 @@ public class TestStoragePolicySatisfier {
       // No block movement will be scheduled as there is no target node
       // available with the required storage type.
       waitForAttemptedItems(1, 30000);
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
       DFSTestUtil.waitExpectedStorageType(
           file1, StorageType.ARCHIVE, 1, 30000, dfs);
       DFSTestUtil.waitExpectedStorageType(
@@ -691,7 +691,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file, StorageType.DISK, 3, 30000, dfs);
 
-      waitForBlocksMovementResult(1, 30000);
+      waitForBlocksMovementAttemptReport(1, 30000);
     } finally {
       shutdownCluster();
     }
@@ -871,7 +871,7 @@ public class TestStoragePolicySatisfier {
       Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem()
           .getBlockManager().getDatanodeManager().getDatanodes();
       for (DatanodeDescriptor dd : dns) {
-        assertNull(dd.getBlocksToMoveStorages());
+        assertNull(dd.getBlocksToMoveStorages(1));
       }
 
       // Enable heart beats now
@@ -1224,7 +1224,7 @@ public class TestStoragePolicySatisfier {
   /**
    * Test SPS for batch processing.
    */
-  @Test(timeout = 300000)
+  @Test(timeout = 3000000)
   public void testBatchProcessingForSPSDirectory() throws Exception {
     try {
       StorageType[][] diskTypes = new StorageType[][] {
@@ -1252,7 +1252,7 @@ public class TestStoragePolicySatisfier {
         DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
             30000, dfs);
       }
-      waitForBlocksMovementResult(files.size(), 30000);
+      waitForBlocksMovementAttemptReport(files.size(), 30000);
       String expectedLogMessage = "StorageMovementNeeded queue remaining"
           + " capacity is zero";
       assertTrue("Log output does not contain expected log message: "
@@ -1268,7 +1268,7 @@ public class TestStoragePolicySatisfier {
    *  1. Delete /root when traversing Q
    *  2. U, R, S should not be in queued.
    */
-  @Test
+  @Test(timeout = 300000)
   public void testTraverseWhenParentDeleted() throws Exception {
     StorageType[][] diskTypes = new StorageType[][] {
         {StorageType.DISK, StorageType.ARCHIVE},
@@ -1330,7 +1330,7 @@ public class TestStoragePolicySatisfier {
    *  1. Delete L when traversing Q
    *  2. E, M, U, R, S should not be in queued.
    */
-  @Test
+  @Test(timeout = 300000)
   public void testTraverseWhenRootParentDeleted() throws Exception {
     StorageType[][] diskTypes = new StorageType[][] {
         {StorageType.DISK, StorageType.ARCHIVE},
@@ -1387,6 +1387,82 @@ public class TestStoragePolicySatisfier {
     dfs.delete(new Path("/root"), true);
   }
 
+  /**
+   * Test storage move blocks while under replication block tasks exists in the
+   * system. So, both will share the max transfer streams.
+   *
+   * 1. Create cluster with 3 datanode.
+   * 2. Create 20 files with 2 replica.
+   * 3. Start 2 more DNs with DISK & SSD types
+   * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task
+   * 5. Set policy to SSD to the 2nd set of files from 11-20
+   * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs
+   * 7. Wait for the under replica and SPS tasks completion
+   */
+  @Test(timeout = 300000)
+  public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception {
+    try {
+      config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3);
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      config.set(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          "3000");
+      config.setBoolean(DFSConfigKeys
+          .DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
+          true);
+
+      StorageType[][] storagetypes = new StorageType[][] {
+          {StorageType.ARCHIVE, StorageType.DISK},
+          {StorageType.ARCHIVE, StorageType.DISK}};
+      hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2)
+          .storageTypes(storagetypes).build();
+      hdfsCluster.waitActive();
+      dfs = hdfsCluster.getFileSystem();
+
+      // Below files will be used for pending replication block tasks.
+      for (int i=1; i<=20; i++){
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 
2,
+            0);
+      }
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.DISK, StorageType.SSD},
+              {StorageType.DISK, StorageType.SSD}};
+      startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+
+      // increase replication factor to 4 for the first 10 files and thus
+      // initiate replica tasks
+      for (int i=1; i<=10; i++){
+        Path filePath = new Path("/file" + i);
+        dfs.setReplication(filePath, (short) 4);
+      }
+
+      // invoke SPS for 11-20 files
+      for (int i = 11; i <= 20; i++) {
+        Path filePath = new Path("/file" + i);
+        dfs.setStoragePolicy(filePath, "ALL_SSD");
+        dfs.satisfyStoragePolicy(filePath);
+      }
+
+      for (int i = 1; i <= 10; i++) {
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+            StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem());
+      }
+      for (int i = 11; i <= 20; i++) {
+        Path filePath = new Path("/file" + i);
+        DFSTestUtil.waitExpectedStorageType(filePath.toString(),
+            StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem());
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
   private static void createDirectoryTree(DistributedFileSystem dfs)
       throws Exception {
     // tree structure
@@ -1514,18 +1590,19 @@ public class TestStoragePolicySatisfier {
     }, 100, timeout);
   }
 
-  private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
-      int timeout) throws TimeoutException, InterruptedException {
+  private void waitForBlocksMovementAttemptReport(
+      long expectedMovementFinishedBlocksCount, int timeout)
+          throws TimeoutException, InterruptedException {
     BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
     final StoragePolicySatisfier sps = 
blockManager.getStoragePolicySatisfier();
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        LOG.info("expectedResultsCount={} actualResultsCount={}",
-            expectedBlkMovResultsCount,
-            sps.getAttemptedItemsMonitor().resultsCount());
-        return sps.getAttemptedItemsMonitor()
-            .resultsCount() == expectedBlkMovResultsCount;
+        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+            expectedMovementFinishedBlocksCount,
+            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
+        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+            >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
index fc5d0a5..154ddae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
@@ -180,7 +180,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
       LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
       cluster.triggerHeartbeats();
 
-      waitForBlocksMovementResult(cluster, 1, 60000);
+      waitForBlocksMovementAttemptReport(cluster, 9, 60000);
       // verify storage types and locations
       waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 
9,
           9, 60000);
@@ -290,7 +290,7 @@ public class TestStoragePolicySatisfierWithStripedFile {
       LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
       cluster.triggerHeartbeats();
 
-      waitForBlocksMovementResult(cluster, 1, 60000);
+      waitForBlocksMovementAttemptReport(cluster, 5, 60000);
       waitForAttemptedItems(cluster, 1, 30000);
       // verify storage types and locations.
       waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 
5,
@@ -556,10 +556,10 @@ public class TestStoragePolicySatisfierWithStripedFile {
     }, 100, timeout);
   }
 
-  // Check whether the block movement result has been arrived at the
+  // Check whether the block movement attempt report has been arrived at the
   // Namenode(SPS).
-  private void waitForBlocksMovementResult(MiniDFSCluster cluster,
-      long expectedBlkMovResultsCount, int timeout)
+  private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
+      long expectedMovementFinishedBlocksCount, int timeout)
           throws TimeoutException, InterruptedException {
     BlockManager blockManager = cluster.getNamesystem().getBlockManager();
     final StoragePolicySatisfier sps = 
blockManager.getStoragePolicySatisfier();
@@ -568,11 +568,11 @@ public class TestStoragePolicySatisfierWithStripedFile {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        LOG.info("expectedResultsCount={} actualResultsCount={}",
-            expectedBlkMovResultsCount,
-            sps.getAttemptedItemsMonitor().resultsCount());
-        return sps.getAttemptedItemsMonitor()
-            .resultsCount() == expectedBlkMovResultsCount;
+        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
+            expectedMovementFinishedBlocksCount,
+            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
+        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
+            >= expectedMovementFinishedBlocksCount;
       }
     }, 100, timeout);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to