HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with length 
less than the first internal block. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/95363bcc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/95363bcc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/95363bcc

Branch: refs/heads/YARN-3926
Commit: 95363bcc7dae28ba9ae2cd7ee9a258fcb58cd932
Parents: 34a3900
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Jan 22 09:46:02 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Jan 22 09:46:02 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  48 ++++--
 .../hadoop/hdfs/DFSStripedInputStream.java      |   2 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  13 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../erasurecode/ErasureCodingWorker.java        | 146 +++++++++++-----
 .../server/protocol/BlockECRecoveryCommand.java |   2 +-
 .../hdfs/TestReadStripedFileWithDecoding.java   |  17 +-
 .../hadoop/hdfs/TestRecoverStripedFile.java     | 172 ++++++++++++-------
 8 files changed, 270 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 3de60b2..3c91ca1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -32,7 +32,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -909,7 +908,8 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-  protected synchronized int readWithStrategy(ReaderStrategy strategy, int 
off, int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off,
+      int len) throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
@@ -959,7 +959,7 @@ public class DFSInputStream extends FSInputStream
           // Check if need to report block replicas corruption either read
           // was successful or ChecksumException occured.
           reportCheckSumFailure(corruptedBlockMap,
-              currentLocatedBlock.getLocations().length);
+              currentLocatedBlock.getLocations().length, false);
         }
       }
     }
@@ -1492,7 +1492,8 @@ public class DFSInputStream extends FSInputStream
         // Check and report if any block replicas are corrupted.
         // BlockMissingException may be caught if all block replicas are
         // corrupted.
-        reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
+        reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length,
+            false);
       }
 
       remaining -= bytesToRead;
@@ -1508,6 +1509,7 @@ public class DFSInputStream extends FSInputStream
 
   /**
    * DFSInputStream reports checksum failure.
+   * For replicated blocks, we have the following logic:
    * Case I : client has tried multiple data nodes and at least one of the
    * attempts has succeeded. We report the other failures as corrupted block to
    * namenode.
@@ -1515,29 +1517,39 @@ public class DFSInputStream extends FSInputStream
    * only report if the total number of replica is 1. We do not
    * report otherwise since this maybe due to the client is a handicapped 
client
    * (who can not read).
+   *
+   * For erasure-coded blocks, each block in corruptedBlockMap is an internal
+   * block in a block group, and there is usually only one DataNode
+   * corresponding to each internal block. For this case we simply report the
+   * corrupted blocks to NameNode and ignore the above logic.
+   *
    * @param corruptedBlockMap map of corrupted blocks
    * @param dataNodeCount number of data nodes who contains the block replicas
    */
   protected void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
-      int dataNodeCount) {
+      int dataNodeCount, boolean isStriped) {
     if (corruptedBlockMap.isEmpty()) {
       return;
     }
-    Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
-        .entrySet().iterator();
-    Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
-    ExtendedBlock blk = entry.getKey();
-    Set<DatanodeInfo> dnSet = entry.getValue();
-    if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
-        || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
-      DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
-      int i = 0;
-      for (DatanodeInfo dn:dnSet) {
-        locs[i++] = dn;
+    List<LocatedBlock> reportList = new ArrayList<>(corruptedBlockMap.size());
+    for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
+        corruptedBlockMap.entrySet()) {
+      ExtendedBlock blk = entry.getKey();
+      Set<DatanodeInfo> dnSet = entry.getValue();
+      if (isStriped || ((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
+          || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
+        DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
+        int i = 0;
+        for (DatanodeInfo dn:dnSet) {
+          locs[i++] = dn;
+        }
+        reportList.add(new LocatedBlock(blk, locs));
       }
-      LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
-      dfsClient.reportChecksumFailure(src, lblocks);
+    }
+    if (reportList.size() > 0) {
+      dfsClient.reportChecksumFailure(src,
+          reportList.toArray(new LocatedBlock[reportList.size()]));
     }
     corruptedBlockMap.clear();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7433256..d15e536 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -451,7 +451,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         // Check if need to report block replicas corruption either read
         // was successful or ChecksumException occured.
         reportCheckSumFailure(corruptedBlockMap,
-            currentLocatedBlock.getLocations().length);
+            currentLocatedBlock.getLocations().length, true);
       }
     }
     return -1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index e8653c8..dbd53a3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSStripedOutputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -35,6 +34,8 @@ import 
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -72,6 +73,8 @@ import java.util.concurrent.TimeUnit;
 @InterfaceAudience.Private
 public class StripedBlockUtil {
 
+  public static final Logger LOG = 
LoggerFactory.getLogger(StripedBlockUtil.class);
+
   /**
    * This method parses a striped block group into individual blocks.
    *
@@ -221,15 +224,11 @@ public class StripedBlockUtil {
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
     } catch (ExecutionException e) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Exception during striped read task", e);
-      }
+      LOG.debug("Exception during striped read task", e);
       return new StripingChunkReadResult(futures.remove(future),
           StripingChunkReadResult.FAILED);
     } catch (CancellationException e) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Exception during striped read task", e);
-      }
+      LOG.debug("Exception during striped read task", e);
       return new StripingChunkReadResult(futures.remove(future),
           StripingChunkReadResult.CANCELLED);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 43adf62..940fa90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -406,6 +406,9 @@ Trunk (Unreleased)
     HDFS-9615. Fix variable name typo in DFSConfigKeys. (Ray Chiang via
     Arpit Agarwal)
 
+    HDFS-9646. ErasureCodingWorker may fail when recovering data blocks with
+    length less than the first internal block. (jing9)
+
     BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
 
       HDFS-7347. Configurable erasure coding policy for individual files and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 5588eec..6ad7164 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -32,8 +32,10 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -46,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -121,9 +124,8 @@ public final class ErasureCodingWorker {
   }
 
   private void initializeStripedReadThreadPool(int num) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Using striped reads; pool threads=" + num);
-    }
+    LOG.debug("Using striped reads; pool threads=" + num);
+
     STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
         TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
         new Daemon.DaemonFactory() {
@@ -148,9 +150,7 @@ public final class ErasureCodingWorker {
   }
 
   private void initializeStripedBlkRecoveryThreadPool(int num) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Using striped block recovery; pool threads=" + num);
-    }
+    LOG.debug("Using striped block recovery; pool threads=" + num);
     STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
         TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
         new Daemon.DaemonFactory() {
@@ -368,11 +368,11 @@ public final class ErasureCodingWorker {
      * @return StripedReader
      */
     private StripedReader addStripedReader(int i, long offsetInBlock) {
-      StripedReader reader = new StripedReader(liveIndices[i]);
+      final ExtendedBlock block = getBlock(blockGroup, liveIndices[i]);
+      StripedReader reader = new StripedReader(liveIndices[i], block, 
sources[i]);
       stripedReaders.add(reader);
 
-      BlockReader blockReader = newBlockReader(
-          getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
+      BlockReader blockReader = newBlockReader(block, offsetInBlock, 
sources[i]);
       if (blockReader != null) {
         initChecksumAndBufferSizeIfNeeded(blockReader);
         reader.blockReader = blockReader;
@@ -435,19 +435,27 @@ public final class ErasureCodingWorker {
           throw new IOException(error);
         }
 
-        long firstStripedBlockLength = getBlockLen(blockGroup, 0);
-        while (positionInBlock < firstStripedBlockLength) {
-          int toRead = Math.min(
-              bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+        long maxTargetLength = 0;
+        for (short targetIndex : targetIndices) {
+          maxTargetLength = Math.max(maxTargetLength,
+              getBlockLen(blockGroup, targetIndex));
+        }
+        while (positionInBlock < maxTargetLength) {
+          final int toRecover = (int) Math.min(
+              bufferSize, maxTargetLength - positionInBlock);
           // step1: read from minimum source DNs required for reconstruction.
-          //   The returned success list is the source DNs we do real read from
-          success = readMinimumStripedData4Recovery(success);
+          // The returned success list is the source DNs we do real read from
+          Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new 
HashMap<>();
+          try {
+            success = readMinimumStripedData4Recovery(success, toRecover,
+                corruptionMap);
+          } finally {
+            // report corrupted blocks to NN
+            reportCorruptedBlocks(corruptionMap);
+          }
 
           // step2: decode to reconstruct targets
-          long remaining = firstStripedBlockLength - positionInBlock;
-          int toRecoverLen = remaining < bufferSize ? 
-              (int)remaining : bufferSize;
-          recoverTargets(success, targetsStatus, toRecoverLen);
+          recoverTargets(success, targetsStatus, toRecover);
 
           // step3: transfer data
           if (transferData2Targets(targetsStatus) == 0) {
@@ -456,7 +464,7 @@ public final class ErasureCodingWorker {
           }
 
           clearBuffers();
-          positionInBlock += toRead;
+          positionInBlock += toRecover;
         }
 
         endTargetBlocks(targetsStatus);
@@ -513,10 +521,11 @@ public final class ErasureCodingWorker {
       }
     }
 
-    private long getReadLength(int index) {
+    /** the reading length should not exceed the length for recovery */
+    private int getReadLength(int index, int recoverLength) {
       long blockLen = getBlockLen(blockGroup, index);
       long remaining = blockLen - positionInBlock;
-      return remaining > bufferSize ? bufferSize : remaining;
+      return (int) Math.min(remaining, recoverLength);
     }
 
     /**
@@ -529,11 +538,15 @@ public final class ErasureCodingWorker {
      * operations and next iteration read.
      * 
      * @param success the initial success list of source DNs we think best
+     * @param recoverLength the length to recover.
      * @return updated success list of source DNs we do real read
      * @throws IOException
      */
-    private int[] readMinimumStripedData4Recovery(final int[] success)
+    private int[] readMinimumStripedData4Recovery(final int[] success,
+        int recoverLength, Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
         throws IOException {
+      Preconditions.checkArgument(recoverLength >= 0 &&
+          recoverLength <= bufferSize);
       int nsuccess = 0;
       int[] newSuccess = new int[minRequiredSources];
       BitSet used = new BitSet(sources.length);
@@ -543,9 +556,11 @@ public final class ErasureCodingWorker {
        */
       for (int i = 0; i < minRequiredSources; i++) {
         StripedReader reader = stripedReaders.get(success[i]);
-        if (getReadLength(liveIndices[success[i]]) > 0) {
-          Callable<Void> readCallable = readFromBlock(
-              reader.blockReader, reader.buffer);
+        final int toRead = getReadLength(liveIndices[success[i]],
+            recoverLength);
+        if (toRead > 0) {
+          Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
+              toRead, corruptionMap);
           Future<Void> f = readService.submit(readCallable);
           futures.put(f, success[i]);
         } else {
@@ -570,10 +585,10 @@ public final class ErasureCodingWorker {
             StripedReader failedReader = stripedReaders.get(result.index);
             closeBlockReader(failedReader.blockReader);
             failedReader.blockReader = null;
-            resultIndex = scheduleNewRead(used);
+            resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
           } else if (result.state == StripingChunkReadResult.TIMEOUT) {
             // If timeout, we also schedule a new read.
-            resultIndex = scheduleNewRead(used);
+            resultIndex = scheduleNewRead(used, recoverLength, corruptionMap);
           }
           if (resultIndex >= 0) {
             newSuccess[nsuccess++] = resultIndex;
@@ -601,6 +616,9 @@ public final class ErasureCodingWorker {
     }
     
     private void paddingBufferToLen(ByteBuffer buffer, int len) {
+      if (len > buffer.limit()) {
+        buffer.limit(len);
+      }
       int toPadding = len - buffer.position();
       for (int i = 0; i < toPadding; i++) {
         buffer.put((byte) 0);
@@ -648,8 +666,8 @@ public final class ErasureCodingWorker {
       int m = 0;
       for (int i = 0; i < targetBuffers.length; i++) {
         if (targetsStatus[i]) {
+          targetBuffers[i].limit(toRecoverLen);
           outputs[m++] = targetBuffers[i];
-          outputs[i].limit(toRecoverLen);
         }
       }
       decoder.decode(inputs, erasedIndices, outputs);
@@ -658,7 +676,7 @@ public final class ErasureCodingWorker {
         if (targetsStatus[i]) {
           long blockLen = getBlockLen(blockGroup, targetIndices[i]);
           long remaining = blockLen - positionInBlock;
-          if (remaining < 0) {
+          if (remaining <= 0) {
             targetBuffers[i].limit(0);
           } else if (remaining < toRecoverLen) {
             targetBuffers[i].limit((int)remaining);
@@ -678,16 +696,19 @@ public final class ErasureCodingWorker {
      * @param used the used source DNs in this iteration.
      * @return the array index of source DN if don't need to do real read.
      */
-    private int scheduleNewRead(BitSet used) {
+    private int scheduleNewRead(BitSet used, int recoverLength,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
       StripedReader reader = null;
       // step1: initially we may only have <code>minRequiredSources</code>
       // number of StripedReader, and there may be some source DNs we never 
       // read before, so will try to create StripedReader for one new source DN
       // and try to read from it. If found, go to step 3.
       int m = stripedReaders.size();
+      int toRead = 0;
       while (reader == null && m < sources.length) {
         reader = addStripedReader(m, positionInBlock);
-        if (getReadLength(liveIndices[m]) > 0) {
+        toRead = getReadLength(liveIndices[m], recoverLength);
+        if (toRead > 0) {
           if (reader.blockReader == null) {
             reader = null;
             m++;
@@ -706,12 +727,14 @@ public final class ErasureCodingWorker {
       for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
         if (!used.get(i)) {
           StripedReader r = stripedReaders.get(i);
-          if (getReadLength(liveIndices[i]) > 0) {
+          toRead = getReadLength(liveIndices[i], recoverLength);
+          if (toRead > 0) {
             closeBlockReader(r.blockReader);
             r.blockReader = newBlockReader(
                 getBlock(blockGroup, liveIndices[i]), positionInBlock,
                 sources[i]);
             if (r.blockReader != null) {
+              r.buffer.position(0);
               m = i;
               reader = r;
             }
@@ -725,8 +748,8 @@ public final class ErasureCodingWorker {
 
       // step3: schedule if find a correct source DN and need to do real read.
       if (reader != null) {
-        Callable<Void> readCallable = readFromBlock(
-            reader.blockReader, reader.buffer);
+        Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
+            toRead, corruptionMap);
         Future<Void> f = readService.submit(readCallable);
         futures.put(f, m);
         used.set(m);
@@ -742,15 +765,22 @@ public final class ErasureCodingWorker {
       }
     }
 
-    private Callable<Void> readFromBlock(final BlockReader reader,
-        final ByteBuffer buf) {
+    private Callable<Void> readFromBlock(final StripedReader reader,
+        final ByteBuffer buf, final int length,
+        final Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
       return new Callable<Void>() {
 
         @Override
         public Void call() throws Exception {
           try {
-            actualReadFromBlock(reader, buf);
+            buf.limit(length);
+            actualReadFromBlock(reader.blockReader, buf);
             return null;
+          } catch (ChecksumException e) {
+            LOG.warn("Found Checksum error for " + reader.block + " from "
+                + reader.source + " at " + e.getPos());
+            addCorruptedBlock(reader.block, reader.source, corruptionMap);
+            throw e;
           } catch (IOException e) {
             LOG.info(e.getMessage());
             throw e;
@@ -760,6 +790,30 @@ public final class ErasureCodingWorker {
       };
     }
 
+    private void reportCorruptedBlocks(
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) throws 
IOException {
+      if (!corruptionMap.isEmpty()) {
+        for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
+            corruptionMap.entrySet()) {
+          for (DatanodeInfo dnInfo : entry.getValue()) {
+            datanode.reportRemoteBadBlock(dnInfo, entry.getKey());
+          }
+        }
+      }
+    }
+
+    private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
+      Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
+      if (dnSet == null) {
+        dnSet = new HashSet<>();
+        corruptionMap.put(blk, dnSet);
+      }
+      if (!dnSet.contains(node)) {
+        dnSet.add(node);
+      }
+    }
+
     /**
      * Read bytes from block
      */
@@ -900,14 +954,14 @@ public final class ErasureCodingWorker {
       }
 
       if (zeroStripeBuffers != null) {
-        for (int i = 0; i < zeroStripeBuffers.length; i++) {
-          zeroStripeBuffers[i].clear();
+        for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
+          zeroStripeBuffer.clear();
         }
       }
 
-      for (int i = 0; i < targetBuffers.length; i++) {
-        if (targetBuffers[i] != null) {
-          targetBuffers[i].clear();
+      for (ByteBuffer targetBuffer : targetBuffers) {
+        if (targetBuffer != null) {
+          targetBuffer.clear();
         }
       }
     }
@@ -998,9 +1052,13 @@ public final class ErasureCodingWorker {
     private final short index; // internal block index
     private BlockReader blockReader;
     private ByteBuffer buffer;
+    private final ExtendedBlock block;
+    private final DatanodeInfo source;
 
-    private StripedReader(short index) {
+    StripedReader(short index, ExtendedBlock block, DatanodeInfo source) {
       this.index = index;
+      this.block = block;
+      this.source = source;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
index 1cb74b3..d0c1786 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
@@ -136,7 +136,7 @@ public class BlockECRecoveryCommand extends DatanodeCommand 
{
           .append("Recovering ").append(block).append(" From: ")
           .append(Arrays.asList(sources)).append(" To: [")
           .append(Arrays.asList(targets)).append(")\n")
-          .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices))
+          .append(" Block Indices: ").append(Arrays.toString(liveBlockIndices))
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index 32b0216..b0af50e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
@@ -29,12 +30,16 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,6 +56,14 @@ import static 
org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
 public class TestReadStripedFileWithDecoding {
   static final Log LOG = 
LogFactory.getLog(TestReadStripedFileWithDecoding.class);
 
+  static {
+    ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
+        .getLogger().setLevel(Level.ALL);
+    GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+    GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
+  }
+
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
   private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
@@ -66,9 +79,9 @@ public class TestReadStripedFileWithDecoding {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 
0);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, 
false);
-    cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
-        .numDataNodes(numDNs).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
     fs = cluster.getFileSystem();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/95363bcc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
index 21352b5..ca9d933 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
@@ -23,11 +23,12 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -64,17 +66,25 @@ public class TestRecoverStripedFile {
 
   static {
     GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+  }
+
+  enum RecoveryType {
+    DataOnly,
+    ParityOnly,
+    Any
   }
 
   private MiniDFSCluster cluster;
-  private Configuration conf;
   private DistributedFileSystem fs;
   // Map: DatanodeID -> datanode index in cluster
-  private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
+  private Map<DatanodeID, Integer> dnMap = new HashMap<>();
+  private final Random random = new Random();
 
   @Before
   public void setup() throws IOException {
-    conf = new Configuration();
+    final Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
         cellSize - 1);
@@ -104,75 +114,140 @@ public class TestRecoverStripedFile {
   @Test(timeout = 120000)
   public void testRecoverOneParityBlock() throws Exception {
     int fileLen = 10 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1);
+    assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen,
+        RecoveryType.ParityOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverOneParityBlock1() throws Exception {
     int fileLen = cellSize + cellSize/10;
-    assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1);
+    assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen,
+        RecoveryType.ParityOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverOneParityBlock2() throws Exception {
     int fileLen = 1;
-    assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1);
+    assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen,
+        RecoveryType.ParityOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverOneParityBlock3() throws Exception {
     int fileLen = 3 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1);
+    assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen,
+        RecoveryType.ParityOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverThreeParityBlocks() throws Exception {
     int fileLen = 10 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
+    assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen,
+        RecoveryType.ParityOnly, 3);
   }
   
   @Test(timeout = 120000)
   public void testRecoverThreeDataBlocks() throws Exception {
     int fileLen = 10 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
+    assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen,
+        RecoveryType.DataOnly, 3);
   }
   
   @Test(timeout = 120000)
   public void testRecoverThreeDataBlocks1() throws Exception {
     int fileLen = 3 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3);
+    assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen,
+        RecoveryType.DataOnly, 3);
   }
   
   @Test(timeout = 120000)
   public void testRecoverOneDataBlock() throws Exception {
     int fileLen = 10 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+    assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen,
+        RecoveryType.DataOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverOneDataBlock1() throws Exception {
     int fileLen = cellSize + cellSize/10;
-    assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1);
+    assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen,
+        RecoveryType.DataOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverOneDataBlock2() throws Exception {
     int fileLen = 1;
-    assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1);
+    assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen,
+        RecoveryType.DataOnly, 1);
   }
   
   @Test(timeout = 120000)
   public void testRecoverAnyBlocks() throws Exception {
     int fileLen = 3 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+    assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen,
+        RecoveryType.Any, 2);
   }
   
   @Test(timeout = 120000)
   public void testRecoverAnyBlocks1() throws Exception {
     int fileLen = 10 * blockSize + blockSize/10;
-    assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3);
+    assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen,
+        RecoveryType.Any, 3);
   }
-  
+
+  private int[] generateDeadDnIndices(RecoveryType type, int deadNum,
+      byte[] indices) {
+    List<Integer> deadList = new ArrayList<>(deadNum);
+    while (deadList.size() < deadNum) {
+      int dead = random.nextInt(indices.length);
+      boolean isOfType = true;
+      if (type == RecoveryType.DataOnly) {
+        isOfType = indices[dead] < dataBlkNum;
+      } else if (type == RecoveryType.ParityOnly) {
+        isOfType = indices[dead] >= dataBlkNum;
+      }
+      if (isOfType && !deadList.contains(dead)) {
+        deadList.add(dead);
+      }
+    }
+    int[] d = new int[deadNum];
+    for (int i = 0; i < deadNum; i++) {
+      d[i] = deadList.get(i);
+    }
+    return d;
+  }
+
+  private void shutdownDataNodes(DataNode dn) throws IOException {
+    /*
+     * Kill the datanode which contains one replica
+     * We need to make sure it dead in namenode: clear its update time and
+     * trigger NN to check heartbeat.
+     */
+    dn.shutdown();
+    cluster.setDataNodeDead(dn.getDatanodeId());
+  }
+
+  private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
+      RecoveryType type)
+    throws IOException {
+    int stoppedDN = 0;
+    for (Map.Entry<ExtendedBlock, DataNode> target : 
corruptTargets.entrySet()) {
+      if (stoppedDN == 0 || type != RecoveryType.DataOnly
+          || random.nextBoolean()) {
+        // stop at least one DN to trigger recovery
+        LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
+            + " with internal block " + target.getKey());
+        shutdownDataNodes(target.getValue());
+        stoppedDN++;
+      } else { // corrupt the data on the DN
+        LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()
+            + " with internal block " + target.getKey());
+        cluster.corruptReplica(target.getValue(), target.getKey());
+      }
+    }
+    return stoppedDN;
+  }
+
   /**
    * Test the file blocks recovery.
    * 1. Check the replica is recovered in the target datanode, 
@@ -180,11 +255,7 @@ public class TestRecoverStripedFile {
    * 2. Read the file and verify content. 
    */
   private void assertFileBlocksRecovery(String fileName, int fileLen,
-      int recovery, int toRecoverBlockNum) throws Exception {
-    if (recovery != 0 && recovery != 1 && recovery != 2) {
-      Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
-          + "1 is to recovery data blocks, 2 is any.");
-    }
+      RecoveryType type, int toRecoverBlockNum) throws Exception {
     if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
       Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
     }
@@ -192,7 +263,7 @@ public class TestRecoverStripedFile {
     Path file = new Path(fileName);
 
     final byte[] data = new byte[fileLen];
-    ThreadLocalRandom.current().nextBytes(data);
+    Arrays.fill(data, (byte) 1);
     DFSTestUtil.writeFile(fs, file, data);
     StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
 
@@ -209,26 +280,10 @@ public class TestRecoverStripedFile {
     for (DatanodeInfo storageInfo : storageInfos) {
       bitset.set(dnMap.get(storageInfo));
     }
-    
-    int[] toDead = new int[toRecoverBlockNum];
-    int n = 0;
-    for (int i = 0; i < indices.length; i++) {
-      if (n < toRecoverBlockNum) {
-        if (recovery == 0) {
-          if (indices[i] >= dataBlkNum) {
-            toDead[n++] = i;
-          }
-        } else if (recovery == 1) {
-          if (indices[i] < dataBlkNum) {
-            toDead[n++] = i;
-          }
-        } else {
-          toDead[n++] = i;
-        }
-      } else {
-        break;
-      }
-    }
+
+    int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices);
+    LOG.info("Note: indices == " + Arrays.toString(indices)
+        + ". Generate errors on datanodes: " + Arrays.toString(dead));
     
     DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
     int[] deadDnIndices = new int[toRecoverBlockNum];
@@ -236,46 +291,41 @@ public class TestRecoverStripedFile {
     File[] replicas = new File[toRecoverBlockNum];
     File[] metadatas = new File[toRecoverBlockNum];
     byte[][] replicaContents = new byte[toRecoverBlockNum][];
+    Map<ExtendedBlock, DataNode> errorMap = new HashMap<>(dead.length);
     for (int i = 0; i < toRecoverBlockNum; i++) {
-      dataDNs[i] = storageInfos[toDead[i]];
+      dataDNs[i] = storageInfos[dead[i]];
       deadDnIndices[i] = dnMap.get(dataDNs[i]);
-      
+
       // Check the block replica file on deadDn before it dead.
       blocks[i] = StripedBlockUtil.constructInternalBlock(
-          lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
+          lastBlock.getBlock(), cellSize, dataBlkNum, indices[dead[i]]);
+      errorMap.put(blocks[i], cluster.getDataNodes().get(deadDnIndices[i]));
       replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
       metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
       // the block replica on the datanode should be the same as expected
       assertEquals(replicas[i].length(), 
           StripedBlockUtil.getInternalBlockLength(
-          lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
+          lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]]));
       assertTrue(metadatas[i].getName().
           endsWith(blocks[i].getGenerationStamp() + ".meta"));
+      LOG.info("replica " + i + " locates in file: " + replicas[i]);
       replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]);
     }
     
     int cellsNum = (fileLen - 1) / cellSize + 1;
     int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
 
-    for (int i = 0; i < toRecoverBlockNum; i++) {
-      /*
-       * Kill the datanode which contains one replica
-       * We need to make sure it dead in namenode: clear its update time and
-       * trigger NN to check heartbeat.
-       */
-      DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
-      dn.shutdown();
-      cluster.setDataNodeDead(dn.getDatanodeId());
-    }
+    // shutdown datanodes or generate corruption
+    int stoppedDN = generateErrors(errorMap, type);
 
     // Check the locatedBlocks of the file again
     locatedBlocks = getLocatedBlocks(file);
     lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
     storageInfos = lastBlock.getLocations();
-    assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
+    assertEquals(storageInfos.length, groupSize - stoppedDN);
 
     int[] targetDNs = new int[dnNum - groupSize];
-    n = 0;
+    int n = 0;
     for (int i = 0; i < dnNum; i++) {
       if (!bitset.get(i)) { // not contain replica of the block.
         targetDNs[n++] = i;
@@ -289,9 +339,11 @@ public class TestRecoverStripedFile {
     // Check the replica on the new target node.
     for (int i = 0; i < toRecoverBlockNum; i++) {
       File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], 
blocks[i]);
+      LOG.info("replica after recovery " + replicaAfterRecovery);
       File metadataAfterRecovery =
           cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
       assertEquals(replicaAfterRecovery.length(), replicas[i].length());
+      LOG.info("replica before " + replicas[i]);
       assertTrue(metadataAfterRecovery.getName().
           endsWith(blocks[i].getGenerationStamp() + ".meta"));
       byte[] replicaContentAfterRecovery =
@@ -366,7 +418,7 @@ public class TestRecoverStripedFile {
     BlockECRecoveryInfo invalidECInfo = new BlockECRecoveryInfo(
         new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, 
liveIndices,
         ErasureCodingPolicyManager.getSystemDefaultPolicy());
-    List<BlockECRecoveryInfo> ecTasks = new ArrayList<BlockECRecoveryInfo>();
+    List<BlockECRecoveryInfo> ecTasks = new ArrayList<>();
     ecTasks.add(invalidECInfo);
     dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
   }

Reply via email to