http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 508da85..a08bd2f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -44,6 +44,7 @@ import javax.management.ObjectName;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -55,10 +56,11 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -79,6 +81,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -88,8 +91,14 @@ import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+import static 
org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static 
org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
@@ -187,7 +196,11 @@ public class BlockManager implements BlockStatsMXBean {
   /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
-  /** Blocks to be invalidated. */
+  /**
+   * Blocks to be invalidated.
+   * For a striped block to invalidate, we should track its individual internal
+   * blocks.
+   */
   private final InvalidateBlocks invalidateBlocks;
   
   /**
@@ -203,8 +216,8 @@ public class BlockManager implements BlockStatsMXBean {
    * Maps a StorageID to the set of blocks that are "extra" for this
    * DataNode. We'll eventually remove these extras.
    */
-  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
-    new TreeMap<String, LightWeightLinkedSet<Block>>();
+  public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap 
=
+    new TreeMap<>();
 
   /**
    * Store set of Blocks that need to be replicated 1 or more times.
@@ -271,12 +284,15 @@ public class BlockManager implements BlockStatsMXBean {
   private double replicationQueuesInitProgress = 0.0;
 
   /** for block replicas placement */
-  private BlockPlacementPolicy blockplacement;
+  private BlockPlacementPolicies placementPolicies;
   private final BlockStoragePolicySuite storagePolicySuite;
 
   /** Check whether name system is running before terminating */
   private boolean checkNSRunning = true;
 
+  /** Check whether there are any non-EC blocks using StripedID */
+  private boolean hasNonEcBlockUsingStripedID = false;
+
   public BlockManager(final Namesystem namesystem, final Configuration conf)
     throws IOException {
     this.namesystem = namesystem;
@@ -292,7 +308,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Compute the map capacity by allocating 2% of total memory
     blocksMap = new BlocksMap(
         LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
-    blockplacement = BlockPlacementPolicy.getInstance(
+    placementPolicies = new BlockPlacementPolicies(
       conf, datanodeManager.getFSClusterStats(),
       datanodeManager.getNetworkTopology(),
       datanodeManager.getHost2DatanodeMap());
@@ -494,15 +510,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   @VisibleForTesting
   public BlockPlacementPolicy getBlockPlacementPolicy() {
-    return blockplacement;
-  }
-
-  /** Set BlockPlacementPolicy */
-  public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
-    if (newpolicy == null) {
-      throw new HadoopIllegalArgumentException("newpolicy == null");
-    }
-    this.blockplacement = newpolicy;
+    return placementPolicies.getPolicy(false);
   }
 
   /** Dump meta data to out. */
@@ -552,9 +560,9 @@ public class BlockManager implements BlockStatsMXBean {
     
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
-    chooseSourceDatanode(block, containingNodes,
+    chooseSourceDatanodes(getStoredBlock(block), containingNodes,
         containingLiveReplicasNodes, numReplicas,
-        UnderReplicatedBlocks.LEVEL);
+        new LinkedList<Short>(), UnderReplicatedBlocks.LEVEL);
     
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which 
are 
     // not included in the numReplicas.liveReplicas() count
@@ -601,11 +609,28 @@ public class BlockManager implements BlockStatsMXBean {
     return maxReplicationStreams;
   }
 
-  /**
-   * @return true if the block has minimum replicas
-   */
-  public boolean checkMinReplication(BlockInfo block) {
-    return (countNodes(block).liveReplicas() >= minReplication);
+  public int getDefaultStorageNum(BlockInfo block) {
+    if (block.isStriped()) {
+      return ((BlockInfoStriped) block).getRealTotalBlockNum();
+    } else {
+      return defaultReplication;
+    }
+  }
+
+  public short getMinStorageNum(BlockInfo block) {
+    if (block.isStriped()) {
+      return ((BlockInfoStriped) block).getRealDataBlockNum();
+    } else {
+      return minReplication;
+    }
+  }
+
+  public boolean hasMinStorage(BlockInfo block) {
+    return countNodes(block).liveReplicas() >= getMinStorageNum(block);
+  }
+
+  public boolean hasMinStorage(BlockInfo block, int liveNum) {
+    return liveNum >= getMinStorageNum(block);
   }
 
   /**
@@ -617,16 +642,21 @@ public class BlockManager implements BlockStatsMXBean {
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  private static boolean commitBlock(
-      final BlockInfoContiguousUnderConstruction block, final Block 
commitBlock)
-      throws IOException {
-    if (block.getBlockUCState() == BlockUCState.COMMITTED)
-      return false;
-    assert block.getNumBytes() <= commitBlock.getNumBytes() :
-      "commitBlock length is less than the stored one "
-      + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
-    block.commitBlock(commitBlock);
-    return true;
+  private static boolean commitBlock(final BlockInfo block,
+      final Block commitBlock) throws IOException {
+    if (block instanceof BlockInfoUnderConstruction
+        && block.getBlockUCState() != BlockUCState.COMMITTED) {
+      final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block;
+
+      assert block.getNumBytes() <= commitBlock.getNumBytes() :
+        "commitBlock length is less than the stored one "
+        + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
+
+      uc.commitBlock(commitBlock);
+      return true;
+    }
+
+    return false;
   }
   
   /**
@@ -649,10 +679,10 @@ public class BlockManager implements BlockStatsMXBean {
     if(lastBlock.isComplete())
       return false; // already completed (e.g. by syncBlock)
     
-    final boolean b = commitBlock(
-        (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
-    if(countNodes(lastBlock).liveReplicas() >= minReplication)
-      completeBlock(bc, bc.numBlocks()-1, false);
+    final boolean b = commitBlock(lastBlock, commitBlock);
+    if (hasMinStorage(lastBlock)) {
+      completeBlock(bc, bc.numBlocks() - 1, false);
+    }
     return b;
   }
 
@@ -665,21 +695,28 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private BlockInfo completeBlock(final BlockCollection bc,
       final int blkIndex, boolean force) throws IOException {
-    if(blkIndex < 0)
+    if (blkIndex < 0) {
       return null;
+    }
     BlockInfo curBlock = bc.getBlocks()[blkIndex];
-    if(curBlock.isComplete())
+    if (curBlock.isComplete()) {
       return curBlock;
-    BlockInfoContiguousUnderConstruction ucBlock =
-        (BlockInfoContiguousUnderConstruction) curBlock;
-    int numNodes = ucBlock.numNodes();
-    if (!force && numNodes < minReplication)
+    }
+
+    int numNodes = curBlock.numNodes();
+    if (!force && !hasMinStorage(curBlock, numNodes)) {
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
-    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
+    }
+    if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
       throw new IOException(
           "Cannot complete block: block has not been COMMITTED by the client");
-    BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+    }
+
+    final BlockInfo completeBlock
+        = !(curBlock instanceof BlockInfoUnderConstruction)? curBlock
+            : ((BlockInfoUnderConstruction)curBlock).convertToCompleteBlock();
+
     // replace penultimate block in file
     bc.setBlock(blkIndex, completeBlock);
     
@@ -690,8 +727,10 @@ public class BlockManager implements BlockStatsMXBean {
     // a "forced" completion when a file is getting closed by an
     // OP_CLOSE edit on the standby).
     namesystem.adjustSafeModeBlockTotals(0, 1);
+    final int minStorage = curBlock.isStriped() ?
+        ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
     namesystem.incrementSafeBlockCount(
-        Math.min(numNodes, minReplication));
+        Math.min(numNodes, minStorage), curBlock);
     
     // replace block in the blocksMap
     return blocksMap.replaceBlock(completeBlock);
@@ -700,10 +739,11 @@ public class BlockManager implements BlockStatsMXBean {
   private BlockInfo completeBlock(final BlockCollection bc,
       final BlockInfo block, boolean force) throws IOException {
     BlockInfo[] fileBlocks = bc.getBlocks();
-    for(int idx = 0; idx < fileBlocks.length; idx++)
-      if(fileBlocks[idx] == block) {
+    for (int idx = 0; idx < fileBlocks.length; idx++) {
+      if (fileBlocks[idx] == block) {
         return completeBlock(bc, idx, force);
       }
+    }
     return block;
   }
   
@@ -713,8 +753,10 @@ public class BlockManager implements BlockStatsMXBean {
    * when tailing edit logs as a Standby.
    */
   public BlockInfo forceCompleteBlock(final BlockCollection bc,
-      final BlockInfoContiguousUnderConstruction block) throws IOException {
-    block.commitBlock(block);
+      final BlockInfo block) throws IOException {
+    if (block instanceof BlockInfoUnderConstruction) {
+      ((BlockInfoUnderConstruction)block).commitBlock(block);
+    }
     return completeBlock(bc, block, true);
   }
 
@@ -744,8 +786,10 @@ public class BlockManager implements BlockStatsMXBean {
 
     DatanodeStorageInfo[] targets = getStorages(oldBlock);
 
-    BlockInfoContiguousUnderConstruction ucBlock =
-        bc.setLastBlock(oldBlock, targets);
+    // convert the last block to UC
+    bc.convertLastBlockToUC(oldBlock, targets);
+    // get the new created uc block
+    BlockInfo ucBlock = bc.getLastBlock();
     blocksMap.replaceBlock(ucBlock);
 
     // Remove block from replication queue.
@@ -756,14 +800,17 @@ public class BlockManager implements BlockStatsMXBean {
 
     // remove this block from the list of pending blocks to be deleted. 
     for (DatanodeStorageInfo storage : targets) {
-      invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
+      final Block b = getBlockOnStorage(oldBlock, storage);
+      if (b != null) {
+        invalidateBlocks.remove(storage.getDatanodeDescriptor(), b);
+      }
     }
     
     // Adjust safe-mode totals, since under-construction blocks don't
     // count in safe-mode.
     namesystem.adjustSafeModeBlockTotals(
         // decrement safe if we had enough
-        targets.length >= minReplication ? -1 : 0,
+        hasMinStorage(oldBlock, targets.length) ? -1 : 0,
         // always decrement total blocks
         -1);
 
@@ -775,23 +822,25 @@ public class BlockManager implements BlockStatsMXBean {
   /**
    * Get all valid locations of the block
    */
-  private List<DatanodeStorageInfo> getValidLocations(Block block) {
+  private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) {
     final List<DatanodeStorageInfo> locations
         = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       // filter invalidate replicas
-      if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
+      Block b = getBlockOnStorage(block, storage);
+      if(b != null && 
+          !invalidateBlocks.contains(storage.getDatanodeDescriptor(), b)) {
         locations.add(storage);
       }
     }
     return locations;
   }
-  
+
   private List<LocatedBlock> createLocatedBlockList(
       final BlockInfo[] blocks,
       final long offset, final long length, final int nrBlocksToReturn,
       final AccessMode mode) throws IOException {
-    int curBlk = 0;
+    int curBlk;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -804,10 +853,10 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return Collections.<LocatedBlock>emptyList();
+      return Collections.emptyList();
 
     long endOff = offset + length;
-    List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
+    List<LocatedBlock> results = new ArrayList<>(blocks.length);
     do {
       results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
       curPos += blocks[curBlk].getNumBytes();
@@ -820,7 +869,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
       final long endPos, final AccessMode mode) throws IOException {
-    int curBlk = 0;
+    int curBlk;
     long curPos = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -833,7 +882,7 @@ public class BlockManager implements BlockStatsMXBean {
     
     return createLocatedBlock(blocks[curBlk], curPos, mode);
   }
-  
+
   private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
     final AccessMode mode) throws IOException {
     final LocatedBlock lb = createLocatedBlock(blk, pos);
@@ -844,19 +893,25 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /** @return a LocatedBlock for the given block */
-  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
-      ) throws IOException {
-    if (blk instanceof BlockInfoContiguousUnderConstruction) {
-      if (blk.isComplete()) {
-        throw new IOException(
-            "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
-            + ", blk=" + blk);
+  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) 
{
+    if (!blk.isComplete()) {
+      if (blk.isStriped()) {
+        final BlockInfoStripedUnderConstruction uc =
+            (BlockInfoStripedUnderConstruction) blk;
+        final DatanodeStorageInfo[] storages = 
uc.getExpectedStorageLocations();
+        final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
+            blk);
+        return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
+            false);
+      } else {
+        assert blk instanceof BlockInfoContiguousUnderConstruction;
+        final BlockInfoContiguousUnderConstruction uc =
+            (BlockInfoContiguousUnderConstruction) blk;
+        final DatanodeStorageInfo[] storages = 
uc.getExpectedStorageLocations();
+        final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
+            blk);
+        return newLocatedBlock(eb, storages, pos, false);
       }
-      final BlockInfoContiguousUnderConstruction uc =
-          (BlockInfoContiguousUnderConstruction) blk;
-      final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
-      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), 
blk);
-      return newLocatedBlock(eb, storages, pos, false);
     }
 
     // get block locations
@@ -873,13 +928,21 @@ public class BlockManager implements BlockStatsMXBean {
         numCorruptNodes == numNodes;
     final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
     final DatanodeStorageInfo[] machines = new 
DatanodeStorageInfo[numMachines];
-    int j = 0;
+    final int[] blockIndices = blk.isStriped() ? new int[numMachines] : null;
+    int j = 0, i = 0;
     if (numMachines > 0) {
       for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
         final DatanodeDescriptor d = storage.getDatanodeDescriptor();
         final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, 
d);
-        if (isCorrupt || (!replicaCorrupt))
+        if (isCorrupt || (!replicaCorrupt)) {
           machines[j++] = storage;
+          // TODO this can be more efficient
+          if (blockIndices != null) {
+            int index = ((BlockInfoStriped) blk).getStorageBlockIndex(storage);
+            assert index >= 0;
+            blockIndices[i++] = index;
+          }
+        }
       }
     }
     assert j == machines.length :
@@ -889,7 +952,9 @@ public class BlockManager implements BlockStatsMXBean {
       " numCorrupt: " + numCorruptNodes +
       " numCorruptRepls: " + numCorruptReplicas;
     final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), 
blk);
-    return newLocatedBlock(eb, machines, pos, isCorrupt);
+    return blockIndices == null ?
+        newLocatedBlock(eb, machines, pos, isCorrupt) :
+        newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
   }
 
   /** Create a LocatedBlocks. */
@@ -897,14 +962,18 @@ public class BlockManager implements BlockStatsMXBean {
       final long fileSizeExcludeBlocksUnderConstruction,
       final boolean isFileUnderConstruction, final long offset,
       final long length, final boolean needBlockToken,
-      final boolean inSnapshot, FileEncryptionInfo feInfo)
+      final boolean inSnapshot, FileEncryptionInfo feInfo,
+      ErasureCodingZone ecZone)
       throws IOException {
     assert namesystem.hasReadLock();
+    final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
+    final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
     if (blocks == null) {
       return null;
     } else if (blocks.length == 0) {
       return new LocatedBlocks(0, isFileUnderConstruction,
-          Collections.<LocatedBlock>emptyList(), null, false, feInfo);
+          Collections.<LocatedBlock> emptyList(), null, false, feInfo, schema,
+          cellSize);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -927,9 +996,9 @@ public class BlockManager implements BlockStatsMXBean {
             fileSizeExcludeBlocksUnderConstruction, mode);
         isComplete = true;
       }
-      return new LocatedBlocks(
-          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
-          locatedblocks, lastlb, isComplete, feInfo);
+      return new LocatedBlocks(fileSizeExcludeBlocksUnderConstruction,
+          isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo,
+          schema, cellSize);
     }
   }
 
@@ -944,9 +1013,23 @@ public class BlockManager implements BlockStatsMXBean {
       final AccessMode mode) throws IOException {
     if (isBlockTokenEnabled()) {
       // Use cached UGI if serving RPC calls.
-      b.setBlockToken(blockTokenSecretManager.generateToken(
-          NameNode.getRemoteUser().getShortUserName(),
-          b.getBlock(), EnumSet.of(mode)));
+      if (b.isStriped()) {
+        LocatedStripedBlock sb = (LocatedStripedBlock) b;
+        int[] indices = sb.getBlockIndices();
+        Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
+        ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock());
+        for (int i = 0; i < indices.length; i++) {
+          internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]);
+          blockTokens[i] = blockTokenSecretManager.generateToken(
+              NameNode.getRemoteUser().getShortUserName(),
+              internalBlock, EnumSet.of(mode));
+        }
+        sb.setBlockTokens(blockTokens);
+      } else {
+        b.setBlockToken(blockTokenSecretManager.generateToken(
+            NameNode.getRemoteUser().getShortUserName(),
+            b.getBlock(), EnumSet.of(mode)));
+      }
     }    
   }
 
@@ -1077,7 +1160,7 @@ public class BlockManager implements BlockStatsMXBean {
    
   /** Remove the blocks associated to the given datanode. */
   void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
-    final Iterator<? extends Block> it = node.getBlockIterator();
+    final Iterator<BlockInfo> it = node.getBlockIterator();
     while(it.hasNext()) {
       removeStoredBlock(it.next(), node);
     }
@@ -1091,12 +1174,15 @@ public class BlockManager implements BlockStatsMXBean {
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
     assert namesystem.hasWriteLock();
-    final Iterator<? extends Block> it = storageInfo.getBlockIterator();
+    final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
-      Block block = it.next();
+      BlockInfo block = it.next();
       removeStoredBlock(block, node);
-      invalidateBlocks.remove(node, block);
+      final Block b = getBlockOnStorage(block, storageInfo);
+      if (b != null) {
+        invalidateBlocks.remove(node, b);
+      }
     }
     namesystem.checkSafeMode();
   }
@@ -1116,22 +1202,32 @@ public class BlockManager implements BlockStatsMXBean {
    * Adds block to list of blocks which will be invalidated on all its
    * datanodes.
    */
-  private void addToInvalidates(Block b) {
+  private void addToInvalidates(BlockInfo storedBlock) {
     if (!namesystem.isPopulatingReplQueues()) {
       return;
     }
     StringBuilder datanodes = new StringBuilder();
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
+        State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      invalidateBlocks.add(b, node, false);
-      datanodes.append(node).append(" ");
+      final Block b = getBlockOnStorage(storedBlock, storage);
+      if (b != null) {
+        invalidateBlocks.add(b, node, false);
+        datanodes.append(node).append(" ");
+      }
     }
     if (datanodes.length() != 0) {
-      blockLog.debug("BLOCK* addToInvalidates: {} {}", b,
+      blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock,
           datanodes.toString());
     }
   }
 
+  private Block getBlockOnStorage(BlockInfo storedBlock,
+      DatanodeStorageInfo storage) {
+    return storedBlock.isStriped() ?
+        ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : 
storedBlock;
+  }
+
   /**
    * Remove all block invalidation tasks under this datanode UUID;
    * used when a datanode registers with a new UUID and the old one
@@ -1155,7 +1251,8 @@ public class BlockManager implements BlockStatsMXBean {
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String storageID, String reason) throws 
IOException {
     assert namesystem.hasWriteLock();
-    final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
+    final Block reportedBlock = blk.getLocalBlock();
+    final BlockInfo storedBlock = getStoredBlock(reportedBlock);
     if (storedBlock == null) {
       // Check if the replica is in the blockMap, if not
       // ignore the request for now. This could happen when BlockScanner
@@ -1172,45 +1269,53 @@ public class BlockManager implements BlockStatsMXBean {
           + ") does not exist");
     }
     
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+    markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
             blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
         storageID == null ? null : node.getStorageInfo(storageID),
         node);
   }
 
   /**
-   * 
-   * @param b
+   * Mark a replica (of a contiguous block) or an internal block (of a striped
+   * block group) as corrupt.
+   * @param b Indicating the reported bad block and the corresponding BlockInfo
+   *          stored in blocksMap.
    * @param storageInfo storage that contains the block, if known. null 
otherwise.
-   * @throws IOException
    */
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
       DatanodeStorageInfo storageInfo,
       DatanodeDescriptor node) throws IOException {
 
-    if (b.corrupted.isDeleted()) {
-      blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
+    if (b.stored.isDeleted()) {
+      blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
           " corrupt as it does not belong to any file", b);
       addToInvalidates(b.corrupted, node);
       return;
     } 
     short expectedReplicas =
-        b.corrupted.getBlockCollection().getPreferredBlockReplication();
+        getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
 
     // Add replica to the data-node if it is not already there
     if (storageInfo != null) {
-      storageInfo.addBlock(b.stored);
+      storageInfo.addBlock(b.stored, b.corrupted);
     }
 
-    // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+    // Add this replica to corruptReplicas Map. For striped blocks, we always
+    // use the id of whole striped block group when adding to corruptReplicas
+    Block corrupted = new Block(b.corrupted);
+    if (b.stored.isStriped()) {
+      corrupted.setBlockId(b.stored.getBlockId());
+    }
+    corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason,
         b.reasonCode);
 
     NumberReplicas numberOfReplicas = countNodes(b.stored);
     boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
         expectedReplicas;
-    boolean minReplicationSatisfied =
-        numberOfReplicas.liveReplicas() >= minReplication;
+
+    boolean minReplicationSatisfied = hasMinStorage(b.stored,
+        numberOfReplicas.liveReplicas());
+
     boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
         (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) 
>
         expectedReplicas;
@@ -1225,7 +1330,7 @@ public class BlockManager implements BlockStatsMXBean {
     if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
         || corruptedDuringWrite) {
       // the block is over-replicated so invalidate the replicas immediately
-      invalidateBlock(b, node);
+      invalidateBlock(b, node, numberOfReplicas);
     } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
       updateNeededReplications(b.stored, -1, 0);
@@ -1237,8 +1342,8 @@ public class BlockManager implements BlockStatsMXBean {
    * @return true if the block was successfully invalidated and no longer
    * present in the BlocksMap
    */
-  private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
-      ) throws IOException {
+  private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
+      NumberReplicas nr) throws IOException {
     blockLog.debug("BLOCK* invalidateBlock: {} on {}", b, dn);
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -1247,7 +1352,6 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Check how many copies we have of the block
-    NumberReplicas nr = countNodes(b.stored);
     if (nr.replicasOnStaleNodes() > 0) {
       blockLog.debug("BLOCK* invalidateBlocks: postponing " +
           "invalidation of {} on {} because {} replica(s) are located on " +
@@ -1318,15 +1422,15 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Scan blocks in {@link #neededReplications} and assign replication
-   * work to data-nodes they belong to.
+   * Scan blocks in {@link #neededReplications} and assign recovery
+   * (replication or erasure coding) work to data-nodes they belong to.
    *
    * The number of process blocks equals either twice the number of live
    * data-nodes or the number of under-replicated blocks whichever is less.
    *
    * @return number of blocks scheduled for replication during this iteration.
    */
-  int computeReplicationWork(int blocksToProcess) {
+  int computeBlockRecoveryWork(int blocksToProcess) {
     List<List<BlockInfo>> blocksToReplicate = null;
     namesystem.writeLock();
     try {
@@ -1336,30 +1440,32 @@ public class BlockManager implements BlockStatsMXBean {
     } finally {
       namesystem.writeUnlock();
     }
-    return computeReplicationWorkForBlocks(blocksToReplicate);
+    return computeRecoveryWorkForBlocks(blocksToReplicate);
   }
 
-  /** Replicate a set of blocks
+  /**
+   * Recover a set of blocks to full strength through replication or
+   * erasure coding
    *
-   * @param blocksToReplicate blocks to be replicated, for each priority
+   * @param blocksToRecover blocks to be recovered, for each priority
    * @return the number of blocks scheduled for replication
    */
   @VisibleForTesting
-  int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) 
{
+  int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
-    DatanodeDescriptor srcNode;
-    BlockCollection bc = null;
+    BlockCollection bc;
     int additionalReplRequired;
 
     int scheduledWork = 0;
-    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+    List<BlockRecoveryWork> recovWork = new LinkedList<>();
 
+    // Step 1: categorize at-risk blocks into replication and EC tasks
     namesystem.writeLock();
     try {
       synchronized (neededReplications) {
-        for (int priority = 0; priority < blocksToReplicate.size(); 
priority++) {
-          for (BlockInfo block : blocksToReplicate.get(priority)) {
+        for (int priority = 0; priority < blocksToRecover.size(); priority++) {
+          for (BlockInfo block : blocksToRecover.get(priority)) {
             // block should belong to a file
             bc = blocksMap.getBlockCollection(block);
             // abandoned block or block reopened for append
@@ -1370,31 +1476,34 @@ public class BlockManager implements BlockStatsMXBean {
               continue;
             }
 
-            requiredReplication = bc.getPreferredBlockReplication();
+            requiredReplication = getExpectedReplicaNum(bc, block);
 
             // get a source data-node
-            containingNodes = new ArrayList<DatanodeDescriptor>();
-            List<DatanodeStorageInfo> liveReplicaNodes = new 
ArrayList<DatanodeStorageInfo>();
+            containingNodes = new ArrayList<>();
+            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
             NumberReplicas numReplicas = new NumberReplicas();
-            srcNode = chooseSourceDatanode(
-                block, containingNodes, liveReplicaNodes, numReplicas,
-                priority);
-            if(srcNode == null) { // block can not be replicated from any node
-              LOG.debug("Block " + block + " cannot be repl from any node");
+            List<Short> liveBlockIndices = new ArrayList<>();
+            final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
+                containingNodes, liveReplicaNodes, numReplicas,
+                liveBlockIndices, priority);
+            if(srcNodes == null || srcNodes.length == 0) {
+              // block can not be replicated from any node
+              LOG.debug("Block " + block + " cannot be recovered " +
+                  "from any node");
               continue;
             }
 
-            // liveReplicaNodes can include READ_ONLY_SHARED replicas which 
are 
+            // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
             // not included in the numReplicas.liveReplicas() count
             assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
 
             // do not schedule more if enough replicas is already pending
             numEffectiveReplicas = numReplicas.liveReplicas() +
                                     pendingReplications.getNumReplicas(block);
-      
+
             if (numEffectiveReplicas >= requiredReplication) {
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                   (blockHasEnoughRacks(block)) ) {
+                   (blockHasEnoughRacks(block, requiredReplication)) ) {
                 neededReplications.remove(block, priority); // remove from 
neededReplications
                 blockLog.debug("BLOCK* Removing {} from neededReplications as" 
+
                         " it has enough replicas", block);
@@ -1408,9 +1517,20 @@ public class BlockManager implements BlockStatsMXBean {
             } else {
               additionalReplRequired = 1; // Needed on a new rack
             }
-            work.add(new ReplicationWork(block, bc, srcNode,
-                containingNodes, liveReplicaNodes, additionalReplRequired,
-                priority));
+            if (block.isStriped()) {
+              short[] indices = new short[liveBlockIndices.size()];
+              for (int i = 0 ; i < liveBlockIndices.size(); i++) {
+                indices[i] = liveBlockIndices.get(i);
+              }
+              ErasureCodingWork ecw = new ErasureCodingWork(block, bc, 
srcNodes,
+                  containingNodes, liveReplicaNodes, additionalReplRequired,
+                  priority, indices);
+              recovWork.add(ecw);
+            } else {
+              recovWork.add(new ReplicationWork(block, bc, srcNodes,
+                  containingNodes, liveReplicaNodes, additionalReplRequired,
+                  priority));
+            }
           }
         }
       }
@@ -1418,8 +1538,9 @@ public class BlockManager implements BlockStatsMXBean {
       namesystem.writeUnlock();
     }
 
+    // Step 2: choose target nodes for each recovery task
     final Set<Node> excludedNodes = new HashSet<Node>();
-    for(ReplicationWork rw : work){
+    for(BlockRecoveryWork rw : recovWork){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
       excludedNodes.clear();
@@ -1430,12 +1551,15 @@ public class BlockManager implements BlockStatsMXBean {
       // choose replication targets: NOT HOLDING THE GLOBAL LOCK
       // It is costly to extract the filename for which chooseTargets is 
called,
       // so for now we pass in the block collection itself.
-      rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
+      final BlockPlacementPolicy placementPolicy =
+          placementPolicies.getPolicy(rw.block.isStriped());
+      rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
     }
 
+    // Step 3: add tasks to the DN
     namesystem.writeLock();
     try {
-      for(ReplicationWork rw : work){
+      for(BlockRecoveryWork rw : recovWork){
         final DatanodeStorageInfo[] targets = rw.targets;
         if(targets == null || targets.length == 0){
           rw.targets = null;
@@ -1454,7 +1578,7 @@ public class BlockManager implements BlockStatsMXBean {
             rw.targets = null;
             continue;
           }
-          requiredReplication = bc.getPreferredBlockReplication();
+          requiredReplication = getExpectedReplicaNum(bc, block);
 
           // do not schedule more if enough replicas is already pending
           NumberReplicas numReplicas = countNodes(block);
@@ -1463,7 +1587,7 @@ public class BlockManager implements BlockStatsMXBean {
 
           if (numEffectiveReplicas >= requiredReplication) {
             if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                 (blockHasEnoughRacks(block)) ) {
+                 (blockHasEnoughRacks(block, requiredReplication)) ) {
               neededReplications.remove(block, priority); // remove from 
neededReplications
               rw.targets = null;
               blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@@ -1473,8 +1597,8 @@ public class BlockManager implements BlockStatsMXBean {
           }
 
           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-               (!blockHasEnoughRacks(block)) ) {
-            if (rw.srcNode.getNetworkLocation().equals(
+               (!blockHasEnoughRacks(block, requiredReplication)) ) {
+            if (rw.srcNodes[0].getNetworkLocation().equals(
                 targets[0].getDatanodeDescriptor().getNetworkLocation())) {
               //No use continuing, unless a new rack in this case
               continue;
@@ -1482,7 +1606,32 @@ public class BlockManager implements BlockStatsMXBean {
           }
 
           // Add block to the to be replicated list
-          rw.srcNode.addBlockToBeReplicated(block, targets);
+          if (block.isStriped()) {
+            assert rw instanceof ErasureCodingWork;
+            assert rw.targets.length > 0;
+            String src = block.getBlockCollection().getName();
+            ErasureCodingZone ecZone = null;
+            try {
+              ecZone = namesystem.getErasureCodingZoneForPath(src);
+            } catch (IOException e) {
+              blockLog
+                  .warn("Failed to get the EC zone for the file {} ", src);
+            }
+            if (ecZone == null) {
+              blockLog.warn("No EC schema found for the file {}. "
+                  + "So cannot proceed for recovery", src);
+              // TODO: we may have to revisit later for what we can do better 
to
+              // handle this case.
+              continue;
+            }
+            rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+                new ExtendedBlock(namesystem.getBlockPoolId(), block),
+                rw.srcNodes, rw.targets,
+                ((ErasureCodingWork) rw).liveBlockIndicies,
+                ecZone.getSchema(), ecZone.getCellSize());
+          } else {
+            rw.srcNodes[0].addBlockToBeReplicated(block, targets);
+          }
           scheduledWork++;
           DatanodeStorageInfo.incrementBlocksScheduled(targets);
 
@@ -1506,15 +1655,15 @@ public class BlockManager implements BlockStatsMXBean {
 
     if (blockLog.isInfoEnabled()) {
       // log which blocks have been scheduled for replication
-      for(ReplicationWork rw : work){
+      for(BlockRecoveryWork rw : recovWork){
         DatanodeStorageInfo[] targets = rw.targets;
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
-          for (int k = 0; k < targets.length; k++) {
+          for (DatanodeStorageInfo target : targets) {
             targetList.append(' ');
-            targetList.append(targets[k].getDatanodeDescriptor());
+            targetList.append(target.getDatanodeDescriptor());
           }
-          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
+          blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
               rw.block, targetList);
         }
       }
@@ -1530,7 +1679,7 @@ public class BlockManager implements BlockStatsMXBean {
   /** Choose target for WebHDFS redirection. */
   public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
       DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
-    return blockplacement.chooseTarget(src, 1, clientnode,
+    return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode,
         Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
         blocksize, storagePolicySuite.getDefaultPolicy());
   }
@@ -1542,9 +1691,10 @@ public class BlockManager implements BlockStatsMXBean {
       List<DatanodeStorageInfo> chosen,
       Set<Node> excludes,
       long blocksize,
-      byte storagePolicyID) {
-    
+      byte storagePolicyID,
+      boolean isStriped) {
     final BlockStoragePolicy storagePolicy = 
storagePolicySuite.getPolicy(storagePolicyID);
+    final BlockPlacementPolicy blockplacement = 
placementPolicies.getPolicy(isStriped);
     return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
         chosen, true, excludes, blocksize, storagePolicy);
   }
@@ -1562,10 +1712,12 @@ public class BlockManager implements BlockStatsMXBean {
       final Set<Node> excludedNodes,
       final long blocksize,
       final List<String> favoredNodes,
-      final byte storagePolicyID) throws IOException {
+      final byte storagePolicyID,
+      final boolean isStriped) throws IOException {
     List<DatanodeDescriptor> favoredDatanodeDescriptors = 
         getDatanodeDescriptors(favoredNodes);
     final BlockStoragePolicy storagePolicy = 
storagePolicySuite.getPolicy(storagePolicyID);
+    final BlockPlacementPolicy blockplacement = 
placementPolicies.getPolicy(isStriped);
     final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
         numOfReplicas, client, excludedNodes, blocksize, 
         favoredDatanodeDescriptors, storagePolicy);
@@ -1600,55 +1752,59 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Parse the data-nodes the block belongs to and choose one,
-   * which will be the replication source.
+   * Parse the data-nodes the block belongs to and choose a certain number
+   * from them to be the recovery sources.
    *
    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
    * since the former do not have write traffic and hence are less busy.
    * We do not use already decommissioned nodes as a source.
-   * Otherwise we choose a random node among those that did not reach their
-   * replication limits.  However, if the replication is of the highest 
priority
-   * and all nodes have reached their replication limits, we will choose a
-   * random node despite the replication limit.
+   * Otherwise we randomly choose nodes among those that did not reach their
+   * replication limits. However, if the recovery work is of the highest
+   * priority and all nodes have reached their replication limits, we will
+   * randomly choose the desired number of nodes despite the replication limit.
    *
    * In addition form a list of all nodes containing the block
    * and calculate its replication numbers.
    *
    * @param block Block for which a replication source is needed
-   * @param containingNodes List to be populated with nodes found to contain 
the 
-   *                        given block
-   * @param nodesContainingLiveReplicas List to be populated with nodes found 
to
-   *                                    contain live replicas of the given 
block
-   * @param numReplicas NumberReplicas instance to be initialized with the 
-   *                                   counts of live, corrupt, excess, and
-   *                                   decommissioned replicas of the given
-   *                                   block.
+   * @param containingNodes List to be populated with nodes found to contain
+   *                        the given block
+   * @param nodesContainingLiveReplicas List to be populated with nodes found
+   *                                    to contain live replicas of the given
+   *                                    block
+   * @param numReplicas NumberReplicas instance to be initialized with the
+   *                    counts of live, corrupt, excess, and decommissioned
+   *                    replicas of the given block.
+   * @param liveBlockIndices List to be populated with indices of healthy
+   *                         blocks in a striped block group
    * @param priority integer representing replication priority of the given
    *                 block
-   * @return the DatanodeDescriptor of the chosen node from which to replicate
-   *         the given block
-   */
-   @VisibleForTesting
-   DatanodeDescriptor chooseSourceDatanode(Block block,
-       List<DatanodeDescriptor> containingNodes,
-       List<DatanodeStorageInfo>  nodesContainingLiveReplicas,
-       NumberReplicas numReplicas,
-       int priority) {
+   * @return the array of DatanodeDescriptor of the chosen nodes from which to
+   *         recover the given block
+   */
+  @VisibleForTesting
+  DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
+      List<DatanodeDescriptor> containingNodes,
+      List<DatanodeStorageInfo> nodesContainingLiveReplicas,
+      NumberReplicas numReplicas,
+      List<Short> liveBlockIndices, int priority) {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
-    DatanodeDescriptor srcNode = null;
+    List<DatanodeDescriptor> srcNodes = new ArrayList<>();
     int live = 0;
     int decommissioned = 0;
     int decommissioning = 0;
     int corrupt = 0;
     int excess = 0;
-    
+    liveBlockIndices.clear();
+    final boolean isStriped = block.isStriped();
+
     Collection<DatanodeDescriptor> nodesCorrupt = 
corruptReplicas.getNodes(block);
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      LightWeightLinkedSet<Block> excessBlocks =
+      LightWeightLinkedSet<BlockInfo> excessBlocks =
         excessReplicateMap.get(node.getDatanodeUuid());
-      int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; 
+      int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt += countableReplica;
       else if (node.isDecommissionInProgress()) {
@@ -1683,21 +1839,25 @@ public class BlockManager implements BlockStatsMXBean {
       if(node.isDecommissioned())
         continue;
 
-      // We got this far, current node is a reasonable choice
-      if (srcNode == null) {
-        srcNode = node;
+      if(isStriped || srcNodes.isEmpty()) {
+        srcNodes.add(node);
+        if (isStriped) {
+          liveBlockIndices.add((short) ((BlockInfoStriped) block).
+              getStorageBlockIndex(storage));
+        }
         continue;
       }
-      // switch to a different node randomly
+      // for replicated block, switch to a different node randomly
       // this to prevent from deterministically selecting the same node even
       // if the node failed to replicate the block on previous iterations
-      if(ThreadLocalRandom.current().nextBoolean())
-        srcNode = node;
+      if (!isStriped && ThreadLocalRandom.current().nextBoolean()) {
+        srcNodes.set(0, node);
+      }
     }
     if(numReplicas != null)
       numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
           excess, 0);
-    return srcNode;
+    return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
   }
 
   /**
@@ -1761,25 +1921,41 @@ public class BlockManager implements BlockStatsMXBean {
    * reported by the datanode in the block report. 
    */
   static class StatefulBlockInfo {
-    final BlockInfoContiguousUnderConstruction storedBlock;
+    final BlockInfo storedBlock; // should be UC block
     final Block reportedBlock;
     final ReplicaState reportedState;
     
-    StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock,
+    StatefulBlockInfo(BlockInfo storedBlock,
         Block reportedBlock, ReplicaState reportedState) {
+      Preconditions.checkArgument(
+          storedBlock instanceof BlockInfoContiguousUnderConstruction ||
+          storedBlock instanceof BlockInfoStripedUnderConstruction);
       this.storedBlock = storedBlock;
       this.reportedBlock = reportedBlock;
       this.reportedState = reportedState;
     }
   }
-  
+
+  private static class BlockInfoToAdd {
+    final BlockInfo stored;
+    final Block reported;
+
+    BlockInfoToAdd(BlockInfo stored, Block reported) {
+      this.stored = stored;
+      this.reported = reported;
+    }
+  }
+
   /**
    * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
    * list of blocks that should be considered corrupt due to a block report.
    */
   private static class BlockToMarkCorrupt {
-    /** The corrupted block in a datanode. */
-    final BlockInfo corrupted;
+    /**
+     * The corrupted block in a datanode. This is the one reported by the
+     * datanode.
+     */
+    final Block corrupted;
     /** The corresponding block stored in the BlockManager. */
     final BlockInfo stored;
     /** The reason to mark corrupt. */
@@ -1787,8 +1963,7 @@ public class BlockManager implements BlockStatsMXBean {
     /** The reason code to be stored */
     final Reason reasonCode;
 
-    BlockToMarkCorrupt(BlockInfo corrupted,
-        BlockInfo stored, String reason,
+    BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
         Reason reasonCode) {
       Preconditions.checkNotNull(corrupted, "corrupted is null");
       Preconditions.checkNotNull(stored, "stored is null");
@@ -1799,15 +1974,9 @@ public class BlockManager implements BlockStatsMXBean {
       this.reasonCode = reasonCode;
     }
 
-    BlockToMarkCorrupt(BlockInfo stored, String reason,
-        Reason reasonCode) {
-      this(stored, stored, reason, reasonCode);
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
-        Reason reasonCode) {
-      this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
-          reason, reasonCode);
+    BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
+        String reason, Reason reasonCode) {
+      this(corrupted, stored, reason, reasonCode);
       //the corrupted block in datanode has a different generation stamp
       corrupted.setGenerationStamp(gs);
     }
@@ -1921,8 +2090,8 @@ public class BlockManager implements BlockStatsMXBean {
       metrics.addBlockReport((int) (endTime - startTime));
     }
     blockLog.info("BLOCK* processReport: from storage {} node {}, " +
-        "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage
-        .getStorageID(), nodeID, newReport.getNumberOfBlocks(),
+            "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", 
storage
+            .getStorageID(), nodeID, newReport.getNumberOfBlocks(),
         node.hasStaleStorages(), (endTime - startTime));
     return !node.hasStaleStorages();
   }
@@ -1944,13 +2113,16 @@ public class BlockManager implements BlockStatsMXBean {
       // more than one storage on a datanode (and because it's a difficult
       // assumption to really enforce)
       removeStoredBlock(block, zombie.getDatanodeDescriptor());
-      invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
+      Block b = getBlockOnStorage(block, zombie);
+      if (b != null) {
+        invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b);
+      }
     }
     assert(zombie.numBlocks() == 0);
     LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
             "which no longer exists on the DataNode.",
-            Long.toHexString(context.getReportId()), prevBlocks,
-            zombie.getStorageID());
+        Long.toHexString(context.getReportId()), prevBlocks,
+        zombie.getStorageID());
   }
 
   /**
@@ -1993,8 +2165,7 @@ public class BlockManager implements BlockStatsMXBean {
         if (i >= blocksPerRescan) {
           break;
         }
-
-        BlockInfo bi = blocksMap.getStoredBlock(b);
+        BlockInfo bi = getStoredBlock(b);
         if (bi == null) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@@ -2034,11 +2205,11 @@ public class BlockManager implements BlockStatsMXBean {
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
-    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toRemove = new TreeSet<Block>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new 
LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
+    Collection<BlockInfo> toRemove = new TreeSet<>();
+    Collection<Block> toInvalidate = new LinkedList<>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<>();
     reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
    
@@ -2047,12 +2218,13 @@ public class BlockManager implements BlockStatsMXBean {
     for (StatefulBlockInfo b : toUC) { 
       addStoredBlockUnderConstruction(b, storageInfo);
     }
-    for (Block b : toRemove) {
+    for (BlockInfo b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
-    for (BlockInfo b : toAdd) {
-      addStoredBlock(b, storageInfo, null, numBlocksLogged < 
maxNumBlocksToLog);
+    for (BlockInfoToAdd b : toAdd) {
+      addStoredBlock(b.stored, b.reported, storageInfo, null,
+          numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2073,17 +2245,18 @@ public class BlockManager implements BlockStatsMXBean {
    * Mark block replicas as corrupt except those on the storages in 
    * newStorages list.
    */
-  public void markBlockReplicasAsCorrupt(BlockInfo block,
+  public void markBlockReplicasAsCorrupt(Block oldBlock,
+      BlockInfo block,
       long oldGenerationStamp, long oldNumBytes, 
       DatanodeStorageInfo[] newStorages) throws IOException {
     assert namesystem.hasWriteLock();
     BlockToMarkCorrupt b = null;
     if (block.getGenerationStamp() != oldGenerationStamp) {
-      b = new BlockToMarkCorrupt(block, oldGenerationStamp,
+      b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
           "genstamp does not match " + oldGenerationStamp
           + " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
     } else if (block.getNumBytes() != oldNumBytes) {
-      b = new BlockToMarkCorrupt(block,
+      b = new BlockToMarkCorrupt(oldBlock, block,
           "length does not match " + oldNumBytes
           + " : " + block.getNumBytes(), Reason.SIZE_MISMATCH);
     } else {
@@ -2141,8 +2314,8 @@ public class BlockManager implements BlockStatsMXBean {
             QUEUE_REASON_FUTURE_GENSTAMP);
         continue;
       }
-      
-      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+
+      BlockInfo storedBlock = getStoredBlock(iblk);
       // If block does not belong to any file, we are done.
       if (storedBlock == null) continue;
       
@@ -2165,38 +2338,38 @@ public class BlockManager implements BlockStatsMXBean {
       
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-        ((BlockInfoContiguousUnderConstruction)storedBlock)
-            .addReplicaIfNotPresent(storageInfo, iblk, reportedState);
+        final BlockInfoUnderConstruction uc = 
(BlockInfoUnderConstruction)storedBlock;
+        uc.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
         // refer HDFS-5283
-        BlockInfoContiguousUnderConstruction blockUC =
-            (BlockInfoContiguousUnderConstruction) storedBlock;
-        if (namesystem.isInSnapshot(blockUC)) {
-          int numOfReplicas = blockUC.getNumExpectedLocations();
-          namesystem.incrementSafeBlockCount(numOfReplicas);
+        if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
+          int numOfReplicas = uc.getNumExpectedLocations();
+          namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
         }
         //and fall through to next clause
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, storageInfo);
+        addStoredBlockImmediate(storedBlock, iblk, storageInfo);
       }
     }
   }
 
   private void reportDiff(DatanodeStorageInfo storageInfo, 
-      BlockListAsLongs newReport, 
-      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
-      Collection<Block> toRemove,           // remove from DatanodeDescriptor
+      BlockListAsLongs newReport,
+      Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
+      Collection<BlockInfo> toRemove,       // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1);
-    AddBlockResult result = storageInfo.addBlock(delimiter);
+    Block delimiterBlock = new Block();
+    BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
+        (short) 1);
+    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
     assert result == AddBlockResult.ADDED 
         : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
@@ -2222,8 +2395,9 @@ public class BlockManager implements BlockStatsMXBean {
     // all of them are next to the delimiter
     Iterator<BlockInfo> it =
         storageInfo.new BlockIterator(delimiter.getNext(0));
-    while(it.hasNext())
+    while(it.hasNext()) {
       toRemove.add(it.next());
+    }
     storageInfo.removeBlock(delimiter);
   }
 
@@ -2260,9 +2434,9 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private BlockInfo processReportedBlock(
       final DatanodeStorageInfo storageInfo,
-      final Block block, final ReplicaState reportedState, 
-      final Collection<BlockInfo> toAdd,
-      final Collection<Block> toInvalidate, 
+      final Block block, final ReplicaState reportedState,
+      final Collection<BlockInfoToAdd> toAdd,
+      final Collection<Block> toInvalidate,
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
     
@@ -2282,7 +2456,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
     
     // find block by blockId
-    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    BlockInfo storedBlock = getStoredBlock(block);
     if(storedBlock == null) {
       // If blocksMap does not contain reported block id,
       // the replica should be removed from the data-node.
@@ -2325,9 +2499,8 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-      toUC.add(new StatefulBlockInfo(
-          (BlockInfoContiguousUnderConstruction) storedBlock,
-          new Block(block), reportedState));
+      toUC.add(new StatefulBlockInfo(storedBlock, new Block(block),
+          reportedState));
       return storedBlock;
     }
 
@@ -2336,7 +2509,7 @@ public class BlockManager implements BlockStatsMXBean {
     if (reportedState == ReplicaState.FINALIZED
         && (storedBlock.findStorageInfo(storageInfo) == -1 ||
             corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      toAdd.add(storedBlock);
+      toAdd.add(new BlockInfoToAdd(storedBlock, block));
     }
     return storedBlock;
   }
@@ -2382,7 +2555,7 @@ public class BlockManager implements BlockStatsMXBean {
       if (rbi.getReportedState() == null) {
         // This is a DELETE_BLOCK request
         DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
-        removeStoredBlock(rbi.getBlock(),
+        removeStoredBlock(getStoredBlock(rbi.getBlock()),
             storageInfo.getDatanodeDescriptor());
       } else {
         processAndHandleReportedBlock(rbi.getStorageInfo(),
@@ -2430,12 +2603,27 @@ public class BlockManager implements BlockStatsMXBean {
       case COMMITTED:
         if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) 
{
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(storedBlock, reportedGS,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock, 
reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
               + " does not match genstamp in block map "
               + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
-        } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
-          return new BlockToMarkCorrupt(storedBlock,
+        }
+        boolean wrongSize;
+        if (storedBlock.isStriped()) {
+          assert BlockIdManager.isStripedBlockID(reported.getBlockId());
+          assert storedBlock.getBlockId() ==
+              BlockIdManager.convertToStripedID(reported.getBlockId());
+          BlockInfoStriped stripedBlock = (BlockInfoStriped) storedBlock;
+          int reportedBlkIdx = BlockIdManager.getBlockIndex(reported);
+          wrongSize = reported.getNumBytes() !=
+              getInternalBlockLength(stripedBlock.getNumBytes(),
+                  BLOCK_STRIPED_CELL_SIZE,
+                  stripedBlock.getDataBlockNum(), reportedBlkIdx);
+        } else {
+          wrongSize = storedBlock.getNumBytes() != reported.getNumBytes();
+        }
+        if (wrongSize) {
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "block is " + ucState + " and reported length " +
               reported.getNumBytes() + " does not match " +
               "length in block map " + storedBlock.getNumBytes(),
@@ -2446,8 +2634,8 @@ public class BlockManager implements BlockStatsMXBean {
       case UNDER_CONSTRUCTION:
         if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
-              + ucState + " and reported state " + reportedState
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock, 
reportedGS,
+              "block is " + ucState + " and reported state " + reportedState
               + ", But reported genstamp " + reportedGS
               + " does not match genstamp in block map "
               + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2462,7 +2650,7 @@ public class BlockManager implements BlockStatsMXBean {
         return null; // not corrupt
       } else if (storedBlock.getGenerationStamp() != 
reported.getGenerationStamp()) {
         final long reportedGS = reported.getGenerationStamp();
-        return new BlockToMarkCorrupt(storedBlock, reportedGS,
+        return new BlockToMarkCorrupt(new Block(reported), storedBlock, 
reportedGS,
             "reported " + reportedState + " replica with genstamp " + 
reportedGS
             + " does not match COMPLETE block's genstamp in block map "
             + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2477,7 +2665,7 @@ public class BlockManager implements BlockStatsMXBean {
               "complete with the same genstamp");
           return null;
         } else {
-          return new BlockToMarkCorrupt(storedBlock,
+          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
               "reported replica has invalid state " + reportedState,
               Reason.INVALID_STATE);
         }
@@ -2490,7 +2678,8 @@ public class BlockManager implements BlockStatsMXBean {
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
+      return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg,
+          Reason.INVALID_STATE);
     }
   }
 
@@ -2517,13 +2706,14 @@ public class BlockManager implements BlockStatsMXBean {
 
   void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
       DatanodeStorageInfo storageInfo) throws IOException {
-    BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock;
-    block.addReplicaIfNotPresent(
-        storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
+    BlockInfo block = ucBlock.storedBlock;
+    final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block;
+    uc.addReplicaIfNotPresent(storageInfo, ucBlock.reportedBlock,
+        ucBlock.reportedState);
 
     if (ucBlock.reportedState == ReplicaState.FINALIZED &&
         (block.findStorageInfo(storageInfo) < 0)) {
-      addStoredBlock(block, storageInfo, null, true);
+      addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
     }
   } 
 
@@ -2538,39 +2728,40 @@ public class BlockManager implements BlockStatsMXBean {
    * 
    * @throws IOException
    */
-  private void addStoredBlockImmediate(BlockInfo storedBlock,
+  private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
       DatanodeStorageInfo storageInfo)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, storageInfo, null, false);
+      addStoredBlock(storedBlock, reported, storageInfo, null, false);
       return;
     }
 
     // just add it
-    AddBlockResult result = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
     if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
-        && numCurrentReplica >= minReplication) {
+        && hasMinStorage(storedBlock, numCurrentReplica)) {
       completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
       // only complete blocks are counted towards that.
       // In the case that the block just became complete above, completeBlock()
       // handles the safe block count maintenance.
-      namesystem.incrementSafeBlockCount(numCurrentReplica);
+      namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
     }
   }
 
   /**
    * Modify (block-->datanode) map. Remove block from set of
    * needed replications if this takes care of the problem.
-   * @return the block that is stored in blockMap.
+   * @return the block that is stored in blocksMap.
    */
   private Block addStoredBlock(final BlockInfo block,
+                               final Block reportedBlock,
                                DatanodeStorageInfo storageInfo,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
@@ -2578,9 +2769,10 @@ public class BlockManager implements BlockStatsMXBean {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
-    if (block instanceof BlockInfoContiguousUnderConstruction) {
+    if (block instanceof BlockInfoContiguousUnderConstruction ||
+        block instanceof BlockInfoStripedUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
-      storedBlock = blocksMap.getStoredBlock(block);
+      storedBlock = getStoredBlock(block);
     } else {
       storedBlock = block;
     }
@@ -2594,10 +2786,9 @@ public class BlockManager implements BlockStatsMXBean {
       return block;
     }
     BlockCollection bc = storedBlock.getBlockCollection();
-    assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    AddBlockResult result = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
@@ -2628,7 +2819,7 @@ public class BlockManager implements BlockStatsMXBean {
       + pendingReplications.getNumReplicas(storedBlock);
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
-        numLiveReplicas >= minReplication) {
+        hasMinStorage(storedBlock, numLiveReplicas)) {
       storedBlock = completeBlock(bc, storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -2636,7 +2827,7 @@ public class BlockManager implements BlockStatsMXBean {
       // Is no-op if not in safe mode.
       // In the case that the block just became complete above, completeBlock()
       // handles the safe block count maintenance.
-      namesystem.incrementSafeBlockCount(numCurrentReplica);
+      namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock);
     }
     
     // if file is under construction, then done for now
@@ -2650,7 +2841,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // handle underReplication/overReplication
-    short fileReplication = bc.getPreferredBlockReplication();
+    short fileReplication = getExpectedReplicaNum(bc, storedBlock);
     if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) 
{
       neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedAndDecommissioning(), fileReplication);
@@ -2666,11 +2857,12 @@ public class BlockManager implements BlockStatsMXBean {
     int numCorruptNodes = num.corruptReplicas();
     if (numCorruptNodes != corruptReplicasCount) {
       LOG.warn("Inconsistent number of corrupt replicas for " +
-          storedBlock + "blockMap has " + numCorruptNodes + 
+          storedBlock + ". blockMap has " + numCorruptNodes +
           " but corrupt replicas map has " + corruptReplicasCount);
     }
-    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
-      invalidateCorruptReplicas(storedBlock);
+    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
+      invalidateCorruptReplicas(storedBlock, reportedBlock, num);
+    }
     return storedBlock;
   }
 
@@ -2702,18 +2894,20 @@ public class BlockManager implements BlockStatsMXBean {
    *
    * @param blk Block whose corrupt replicas need to be invalidated
    */
-  private void invalidateCorruptReplicas(BlockInfo blk) {
+  private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
+      NumberReplicas numberReplicas) {
     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
     boolean removedFromBlocksMap = true;
     if (nodes == null)
       return;
     // make a copy of the array of nodes in order to avoid
     // ConcurrentModificationException, when the block is removed from the node
-    DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
+    DatanodeDescriptor[] nodesCopy =
+        nodes.toArray(new DatanodeDescriptor[nodes.size()]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
-              Reason.ANY), node)) {
+        if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
+            Reason.ANY), node, numberReplicas)) {
           removedFromBlocksMap = false;
         }
       } catch (IOException e) {
@@ -2864,6 +3058,15 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Get the value of whether there are any non-EC blocks using StripedID.
+   *
+   * @return Returns the value of whether there are any non-EC blocks using 
StripedID.
+   */
+  public boolean hasNonEcBlockUsingStripedID(){
+    return hasNonEcBlockUsingStripedID;
+  }
+
+  /**
    * Process a single possibly misreplicated block. This adds it to the
    * appropriate queues if necessary, and returns a result code indicating
    * what happened with it.
@@ -2881,7 +3084,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
     // calculate current replication
     short expectedReplication =
-        block.getBlockCollection().getPreferredBlockReplication();
+        getExpectedReplicaNum(block.getBlockCollection(), block);
     NumberReplicas num = countNodes(block);
     int numCurrentReplica = num.liveReplicas();
     // add to under-replicated queue if need to be
@@ -2940,14 +3143,14 @@ public class BlockManager implements BlockStatsMXBean {
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  private void processOverReplicatedBlock(final Block block,
+  private void processOverReplicatedBlock(final BlockInfo block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
     assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
-    Collection<DatanodeStorageInfo> nonExcess = new 
ArrayList<DatanodeStorageInfo>();
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(block);
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block, 
State.NORMAL)) {
@@ -2961,8 +3164,8 @@ public class BlockManager implements BlockStatsMXBean {
         postponeBlock(block);
         return;
       }
-      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
-          .getDatanodeUuid());
+      LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+          cur.getDatanodeUuid());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
           // exclude corrupt replicas
@@ -2972,10 +3175,29 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     }
-    chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint, blockplacement);
+    chooseExcessReplicates(nonExcess, block, replication, addedNode,
+        delNodeHint);
   }
 
+  private void chooseExcessReplicates(
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock, short replication,
+      DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint) {
+    assert namesystem.hasWriteLock();
+    // first form a rack to datanodes map and
+    BlockCollection bc = getBlockCollection(storedBlock);
+    if (storedBlock.isStriped()) {
+      chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint);
+    } else {
+      final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+          bc.getStoragePolicyID());
+      final List<StorageType> excessTypes = storagePolicy.chooseExcess(
+          replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
+      chooseExcessReplicasContiguous(bc, nonExcess, storedBlock,
+          replication, addedNode, delNodeHint, excessTypes);
+    }
+  }
 
   /**
    * We want "replication" replicates for the block, but we now have too many. 
 
@@ -2991,23 +3213,16 @@ public class BlockManager implements BlockStatsMXBean {
    * If no such a node is available,
    * then pick a node with least free space
    */
-  private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> 
nonExcess, 
-                              Block b, short replication,
-                              DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint,
-                              BlockPlacementPolicy replicator) {
-    assert namesystem.hasWriteLock();
-    // first form a rack to datanodes map and
-    BlockCollection bc = getBlockCollection(b);
-    final BlockStoragePolicy storagePolicy = 
storagePolicySuite.getPolicy(bc.getStoragePolicyID());
-    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
-        replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
-
-
-    final Map<String, List<DatanodeStorageInfo>> rackMap
-        = new HashMap<String, List<DatanodeStorageInfo>>();
-    final List<DatanodeStorageInfo> moreThanOne = new 
ArrayList<DatanodeStorageInfo>();
-    final List<DatanodeStorageInfo> exactlyOne = new 
ArrayList<DatanodeStorageInfo>();
+  private void chooseExcessReplicasContiguous(BlockCollection bc,
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock, short replication,
+      DatanodeDescriptor addedNode,
+      DatanodeDescriptor delNodeHint,
+      List<StorageType> excessTypes) {
+    BlockPlacementPolicy replicator = placementPolicies.getPolicy(false);
+    final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
     
     // split nodes into two sets
     // moreThanOne contains nodes on rack with more than one replica
@@ -3028,33 +3243,111 @@ public class BlockManager implements BlockStatsMXBean {
           moreThanOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(bc, b, replication,
+        cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
             moreThanOne, exactlyOne, excessTypes);
       }
       firstOne = false;
-
       // adjust rackmap, moreThanOne, and exactlyOne
       replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
           exactlyOne, cur);
 
-      nonExcess.remove(cur);
-      addToExcessReplicate(cur.getDatanodeDescriptor(), b);
+      processChosenExcessReplica(nonExcess, cur, storedBlock);
+    }
+  }
 
-      //
-      // The 'excessblocks' tracks blocks until we get confirmation
-      // that the datanode has deleted them; the only way we remove them
-      // is when we get a "removeBlock" message.  
-      //
-      // The 'invalidate' list is used to inform the datanode the block 
-      // should be deleted.  Items are removed from the invalidate list
-      // upon giving instructions to the namenode.
-      //
-      addToInvalidates(b, cur.getDatanodeDescriptor());
-      blockLog.debug("BLOCK* chooseExcessReplicates: "
-                +"({}, {}) is added to invalidated blocks set", cur, b);
+  /**
+   * We want block group has every internal block, but we have redundant
+   * internal blocks (which have the same index).
+   * In this method, we delete the redundant internal blocks until only one
+   * left for each index.
+   *
+   * The block placement policy will make sure that the left internal blocks 
are
+   * spread across racks and also try hard to pick one with least free space.
+   */
+  private void chooseExcessReplicasStriped(BlockCollection bc,
+      final Collection<DatanodeStorageInfo> nonExcess,
+      BlockInfo storedBlock,
+      DatanodeDescriptor delNodeHint) {
+    assert storedBlock instanceof BlockInfoStriped;
+    BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
+    short groupSize = sblk.getTotalBlockNum();
+    BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true);
+    List<DatanodeStorageInfo> empty = new ArrayList<>(0);
+
+    // find all duplicated indices
+    BitSet found = new BitSet(groupSize); //indices found
+    BitSet duplicated = new BitSet(groupSize); //indices found more than once
+    HashMap<DatanodeStorageInfo, Integer> storage2index = new HashMap<>();
+    for (DatanodeStorageInfo storage : nonExcess) {
+      int index = sblk.getStorageBlockIndex(storage);
+      assert index >= 0;
+      if (found.get(index)) {
+        duplicated.set(index);
+      }
+      found.set(index);
+      storage2index.put(storage, index);
+    }
+    // the number of target left replicas equals to the of number of the found
+    // indices.
+    int numOfTarget = found.cardinality();
+
+    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+        bc.getStoragePolicyID());
+    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
+        (short)numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess));
+
+    // use delHint only if delHint is duplicated
+    final DatanodeStorageInfo delStorageHint =
+        DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
+    if (delStorageHint != null) {
+      Integer index = storage2index.get(delStorageHint);
+      if (index != null && duplicated.get(index)) {
+        processChosenExcessReplica(nonExcess, delStorageHint, storedBlock);
+      }
+    }
+
+    // for each duplicated index, delete some replicas until only one left
+    for (int targetIndex = duplicated.nextSetBit(0); targetIndex >= 0;
+         targetIndex = duplicated.nextSetBit(targetIndex + 1)) {
+      List<DatanodeStorageInfo> candidates = new ArrayList<>();
+      for (DatanodeStorageInfo storage : nonExcess) {
+        int index = storage2index.get(storage);
+        if (index == targetIndex) {
+          candidates.add(storage);
+        }
+      }
+      Block internalBlock = new Block(storedBlock);
+      internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
+      while (candidates.size() > 1) {
+        DatanodeStorageInfo target = placementPolicy.chooseReplicaToDelete(bc,
+            internalBlock, (short)1, candidates, empty, excessTypes);
+        processChosenExcessReplica(nonExcess, target, storedBlock);
+        candidates.remove(target);
+      }
+      duplicated.clear(targetIndex);
     }
   }
 
+  private void processChosenExcessReplica(
+      final Collection<DatanodeStorageInfo> nonExcess,
+      final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
+    nonExcess.remove(chosen);
+    addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
+    //
+    // The 'excessblocks' tracks blocks until we get confirmation
+    // that the datanode has deleted them; the only way we remove them
+    // is when we get a "removeBlock" message.
+    //
+    // The 'invalidate' list is used to inform the datanode the block
+    // should be deleted.  Items are removed from the invalidate list
+    // upon giving instructions to the datanodes.
+    //
+    final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
+    addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
+    blockLog.debug("BLOCK* chooseExcessReplicates: "
+        + "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
+  }
+
   /** Check if we can use delHint */
   static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
       DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
@@ -3076,17 +3369,18 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  private void addToExcessReplicate(DatanodeInfo dn, Block block) {
+  private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
     assert namesystem.hasWriteLock();
-    LightWeightLinkedSet<Block> excessBlocks = 
excessReplicateMap.get(dn.getDatanodeUuid());
+    LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+        dn.getDatanodeUuid());
     if (excessBlocks == null) {
-      excessBlocks = new LightWeightLinkedSet<Block>();
+      excessBlocks = new LightWeightLinkedSet<>();
       excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
     }
-    if (excessBlocks.add(block)) {
+    if (excessBlocks.add(storedBlock)) {
       excessBlocksCount.incrementAndGet();
       blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
-          + " excessReplicateMap", dn, block);
+          + " excessReplicateMap", dn, storedBlock);
     }
   }
 
@@ -3098,26 +3392,25 @@ public class BlockManager implements BlockStatsMXBean {
           QUEUE_REASON_FUTURE_GENSTAMP);
       return;
     }
-    removeStoredBlock(block, node);
+    removeStoredBlock(getStoredBlock(block), node);
   }
 
   /**
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
-    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
+  public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor 
node) {
+    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
     assert (namesystem.hasWriteLock());
     {
-      BlockInfo storedBlock = getStoredBlock(block);
       if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
         blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
-            " removed from node {}", block, node);
+            " removed from node {}", storedBlock, node);
         return;
       }
 
       CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
-          .get(new CachedBlock(block.getBlockId(), (short) 0, false));
+          .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
       if (cblock != null) {
         boolean removed = false;
         removed |= node.getPendingCached().remove(cblock);
@@ -3125,7 +3418,7 @@ public class BlockManager implements BlockStatsMXBean {
         removed |= node.getPendingUncached().remove(cblock);
         if (removed) {
           blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
-              + "related lists on node {}", block, node);
+              + "related lists on node {}", storedBlock, node);
         }
       }
 
@@ -3135,7 +3428,7 @@ public class BlockManager implements BlockStatsMXBean {
       // necessary. In that case, put block on a possibly-will-
       // be-replicated list.
       //
-      BlockCollection bc = blocksMap.getBlockCollection(block);
+      BlockCollection bc = storedBlock.getBlockCollection();
       if (bc != null) {
         namesystem.decrementS

<TRUNCATED>

Reply via email to