http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
new file mode 100644
index 0000000..3c9adc4
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -0,0 +1,988 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSPacket;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemoteBlockReader2;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
+
+/**
+ * ErasureCodingWorker handles the erasure coding recovery work commands. These
+ * commands would be issued from Namenode as part of Datanode's heart beat
+ * response. BPOfferService delegates the work to this class for handling EC
+ * commands.
+ */
+public final class ErasureCodingWorker {
+  private static final Log LOG = DataNode.LOG;
+  
+  private final DataNode datanode; 
+  private final Configuration conf;
+
+  private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
+  private final int STRIPED_READ_THRESHOLD_MILLIS;
+  private final int STRIPED_READ_BUFFER_SIZE;
+
+  public ErasureCodingWorker(Configuration conf, DataNode datanode) {
+    this.datanode = datanode;
+    this.conf = conf;
+
+    STRIPED_READ_THRESHOLD_MILLIS = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY,
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT);
+    initializeStripedReadThreadPool(conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, 
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
+    STRIPED_READ_BUFFER_SIZE = conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
+        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
+  }
+  
+  private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
+    return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
+  }
+
+  private void initializeStripedReadThreadPool(int num) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using striped reads; pool threads=" + num);
+    }
+    STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new Daemon.DaemonFactory() {
+      private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = super.newThread(r);
+        t.setName("stripedRead-" + threadIndex.getAndIncrement());
+        return t;
+      }
+    }, new ThreadPoolExecutor.CallerRunsPolicy() {
+      @Override
+      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+        LOG.info("Execution for striped reading rejected, "
+            + "Executing in current thread");
+        // will run in the current thread
+        super.rejectedExecution(runnable, e);
+      }
+    });
+    STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+  }
+
+  /**
+   * Handles the Erasure Coding recovery work commands.
+   * 
+   * @param ecTasks
+   *          BlockECRecoveryInfo
+   */
+  public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> 
ecTasks) {
+    for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
+      try {
+        new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
+      } catch (Throwable e) {
+        LOG.warn("Failed to recover striped block " + 
+            recoveryInfo.getExtendedBlock().getLocalBlock(), e);
+      }
+    }
+  }
+
+  /**
+   * ReconstructAndTransferBlock recover one or more missed striped block in 
the
+   * striped block group, the minimum number of live striped blocks should be
+   * no less than data block number.
+   * 
+   * | <- Striped Block Group -> |
+   *  blk_0      blk_1       blk_2(*)   blk_3   ...   <- A striped block group
+   *    |          |           |          |  
+   *    v          v           v          v 
+   * +------+   +------+   +------+   +------+
+   * |cell_0|   |cell_1|   |cell_2|   |cell_3|  ...    
+   * +------+   +------+   +------+   +------+     
+   * |cell_4|   |cell_5|   |cell_6|   |cell_7|  ...
+   * +------+   +------+   +------+   +------+
+   * |cell_8|   |cell_9|   |cell10|   |cell11|  ...
+   * +------+   +------+   +------+   +------+
+   *  ...         ...       ...         ...
+   *  
+   * 
+   * We use following steps to recover striped block group, in each round, we
+   * recover <code>bufferSize</code> data until finish, the 
+   * <code>bufferSize</code> is configurable and may be less or larger than 
+   * cell size:
+   * step1: read <code>bufferSize</code> data from minimum number of sources 
+   *        required by recovery.
+   * step2: decode data for targets.
+   * step3: transfer data to targets.
+   * 
+   * In step1, try to read <code>bufferSize</code> data from minimum number
+   * of sources , if there is corrupt or stale sources, read from new source
+   * will be scheduled. The best sources are remembered for next round and 
+   * may be updated in each round.
+   * 
+   * In step2, typically if source blocks we read are all data blocks, we 
+   * need to call encode, and if there is one parity block, we need to call
+   * decode. Notice we only read once and recover all missed striped block 
+   * if they are more than one.
+   * 
+   * In step3, send the recovered data to targets by constructing packet 
+   * and send them directly. Same as continuous block replication, we 
+   * don't check the packet ack. Since the datanode doing the recovery work
+   * are one of the source datanodes, so the recovered data are sent 
+   * remotely.
+   * 
+   * There are some points we can do further improvements in next phase:
+   * 1. we can read the block file directly on the local datanode, 
+   *    currently we use remote block reader. (Notice short-circuit is not
+   *    a good choice, see inline comments).
+   * 2. We need to check the packet ack for EC recovery? Since EC recovery
+   *    is more expensive than continuous block replication, it needs to 
+   *    read from several other datanodes, should we make sure the 
+   *    recovered result received by targets? 
+   */
+  private class ReconstructAndTransferBlock implements Runnable {
+    private final int dataBlkNum;
+    private final int parityBlkNum;
+    private final int cellSize;
+    
+    private RawErasureDecoder decoder;
+
+    // Striped read buffer size
+    private int bufferSize;
+
+    private final ExtendedBlock blockGroup;
+    private final int minRequiredSources;
+    // position in striped internal block
+    private long positionInBlock;
+
+    // sources
+    private final short[] liveIndices;
+    private final DatanodeInfo[] sources;
+
+    private final List<StripedReader> stripedReaders;
+
+    // The buffers and indices for striped blocks whose length is 0
+    private ByteBuffer[] zeroStripeBuffers;
+    private short[] zeroStripeIndices;
+
+    // targets
+    private final DatanodeInfo[] targets;
+    private final StorageType[] targetStorageTypes;
+
+    private final short[] targetIndices;
+    private final ByteBuffer[] targetBuffers;
+
+    private final Socket[] targetSockets;
+    private final DataOutputStream[] targetOutputStreams;
+    private final DataInputStream[] targetInputStreams;
+
+    private final long[] blockOffset4Targets;
+    private final long[] seqNo4Targets;
+
+    private final static int WRITE_PACKET_SIZE = 64 * 1024;
+    private DataChecksum checksum;
+    private int maxChunksPerPacket;
+    private byte[] packetBuf;
+    private byte[] checksumBuf;
+    private int bytesPerChecksum;
+    private int checksumSize;
+
+    private final CachingStrategy cachingStrategy;
+
+    private final Map<Future<Void>, Integer> futures = new HashMap<>();
+    private final CompletionService<Void> readService =
+        new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
+
+    ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
+      ECSchema schema = recoveryInfo.getECSchema();
+      dataBlkNum = schema.getNumDataUnits();
+      parityBlkNum = schema.getNumParityUnits();
+      cellSize = recoveryInfo.getCellSize();
+
+      blockGroup = recoveryInfo.getExtendedBlock();
+      final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 
1);
+      minRequiredSources = Math.min(cellsNum, dataBlkNum);
+
+      liveIndices = recoveryInfo.getLiveBlockIndices();
+      sources = recoveryInfo.getSourceDnInfos();
+      stripedReaders = new ArrayList<>(sources.length);
+
+      Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
+          "No enough live striped blocks.");
+      Preconditions.checkArgument(liveIndices.length == sources.length,
+          "liveBlockIndices and source dns should match");
+
+      if (minRequiredSources < dataBlkNum) {
+        zeroStripeBuffers = 
+            new ByteBuffer[dataBlkNum - minRequiredSources];
+        zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
+      }
+
+      targets = recoveryInfo.getTargetDnInfos();
+      targetStorageTypes = recoveryInfo.getTargetStorageTypes();
+      targetIndices = new short[targets.length];
+      targetBuffers = new ByteBuffer[targets.length];
+
+      Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
+          "Too much missed striped blocks.");
+
+      targetSockets = new Socket[targets.length];
+      targetOutputStreams = new DataOutputStream[targets.length];
+      targetInputStreams = new DataInputStream[targets.length];
+
+      blockOffset4Targets = new long[targets.length];
+      seqNo4Targets = new long[targets.length];
+
+      for (int i = 0; i < targets.length; i++) {
+        blockOffset4Targets[i] = 0;
+        seqNo4Targets[i] = 0;
+      }
+
+      getTargetIndices();
+      cachingStrategy = CachingStrategy.newDefaultStrategy();
+    }
+
+    private ByteBuffer allocateBuffer(int length) {
+      return ByteBuffer.allocate(length);
+    }
+
+    private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
+      return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
+          dataBlkNum, i);
+    }
+
+    private long getBlockLen(ExtendedBlock blockGroup, int i) { 
+      return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
+          cellSize, dataBlkNum, i);
+    }
+
+    /**
+     * StripedReader is used to read from one source DN, it contains a block
+     * reader, buffer and striped block index.
+     * Only allocate StripedReader once for one source, and the StripedReader
+     * has the same array order with sources. Typically we only need to 
allocate
+     * minimum number (minRequiredSources) of StripedReader, and allocate
+     * new for new source DN if some existing DN invalid or slow.
+     * If some source DN is corrupt, set the corresponding blockReader to 
+     * null and will never read from it again.
+     *  
+     * @param i the array index of sources
+     * @param offsetInBlock offset for the internal block
+     * @return StripedReader
+     */
+    private StripedReader addStripedReader(int i, long offsetInBlock) {
+      StripedReader reader = new StripedReader(liveIndices[i]);
+      stripedReaders.add(reader);
+
+      BlockReader blockReader = newBlockReader(
+          getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
+      if (blockReader != null) {
+        initChecksumAndBufferSizeIfNeeded(blockReader);
+        reader.blockReader = blockReader;
+      }
+      reader.buffer = allocateBuffer(bufferSize);
+      return reader;
+    }
+
+    @Override
+    public void run() {
+      datanode.incrementXmitsInProgress();
+      try {
+        // Store the array indices of source DNs we have read successfully.
+        // In each iteration of read, the success list may be updated if
+        // some source DN is corrupted or slow. And use the updated success
+        // list of DNs for next iteration read.
+        int[] success = new int[minRequiredSources];
+
+        int nsuccess = 0;
+        for (int i = 0; 
+            i < sources.length && nsuccess < minRequiredSources; i++) {
+          StripedReader reader = addStripedReader(i, 0);
+          if (reader.blockReader != null) {
+            success[nsuccess++] = i;
+          }
+        }
+
+        if (nsuccess < minRequiredSources) {
+          String error = "Can't find minimum sources required by "
+              + "recovery, block id: " + blockGroup.getBlockId();
+          throw new IOException(error);
+        }
+
+        if (zeroStripeBuffers != null) {
+          for (int i = 0; i < zeroStripeBuffers.length; i++) {
+            zeroStripeBuffers[i] = allocateBuffer(bufferSize);
+          }
+        }
+
+        for (int i = 0; i < targets.length; i++) {
+          targetBuffers[i] = allocateBuffer(bufferSize);
+        }
+
+        checksumSize = checksum.getChecksumSize();
+        int chunkSize = bytesPerChecksum + checksumSize;
+        maxChunksPerPacket = Math.max(
+            (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 
1);
+        int maxPacketSize = chunkSize * maxChunksPerPacket 
+            + PacketHeader.PKT_MAX_HEADER_LEN;
+
+        packetBuf = new byte[maxPacketSize];
+        checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
+
+        // targetsStatus store whether some target is success, it will record
+        // any failed target once, if some target failed (invalid DN or 
transfer
+        // failed), will not transfer data to it any more.
+        boolean[] targetsStatus = new boolean[targets.length];
+        if (initTargetStreams(targetsStatus) == 0) {
+          String error = "All targets are failed.";
+          throw new IOException(error);
+        }
+
+        long firstStripedBlockLength = getBlockLen(blockGroup, 0);
+        while (positionInBlock < firstStripedBlockLength) {
+          int toRead = Math.min(
+              bufferSize, (int)(firstStripedBlockLength - positionInBlock));
+          // step1: read from minimum source DNs required for reconstruction.
+          //   The returned success list is the source DNs we do real read from
+          success = readMinimumStripedData4Recovery(success);
+
+          // step2: decode to reconstruct targets
+          long remaining = firstStripedBlockLength - positionInBlock;
+          int toRecoverLen = remaining < bufferSize ? 
+              (int)remaining : bufferSize;
+          recoverTargets(success, targetsStatus, toRecoverLen);
+
+          // step3: transfer data
+          if (transferData2Targets(targetsStatus) == 0) {
+            String error = "Transfer failed for all targets.";
+            throw new IOException(error);
+          }
+
+          clearBuffers();
+          positionInBlock += toRead;
+        }
+
+        endTargetBlocks(targetsStatus);
+
+        // Currently we don't check the acks for packets, this is similar as
+        // block replication.
+      } catch (Throwable e) {
+        LOG.warn("Failed to recover striped block: " + blockGroup, e);
+      } finally {
+        datanode.decrementXmitsInProgress();
+        // close block readers
+        for (StripedReader stripedReader : stripedReaders) {
+          closeBlockReader(stripedReader.blockReader);
+        }
+        for (int i = 0; i < targets.length; i++) {
+          IOUtils.closeStream(targetOutputStreams[i]);
+          IOUtils.closeStream(targetInputStreams[i]);
+          IOUtils.closeStream(targetSockets[i]);
+        }
+      }
+    }
+
+    // init checksum from block reader
+    private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
+      if (checksum == null) {
+        checksum = blockReader.getDataChecksum();
+        bytesPerChecksum = checksum.getBytesPerChecksum();
+        // The bufferSize is flat to divide bytesPerChecksum
+        int readBufferSize = STRIPED_READ_BUFFER_SIZE;
+        bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
+          readBufferSize - readBufferSize % bytesPerChecksum;
+      } else {
+        assert blockReader.getDataChecksum().equals(checksum);
+      }
+    }
+
+    private void getTargetIndices() {
+      BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
+      for (int i = 0; i < sources.length; i++) {
+        bitset.set(liveIndices[i]);
+      }
+      int m = 0;
+      int k = 0;
+      for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+        if (!bitset.get(i)) {
+          if (getBlockLen(blockGroup, i) > 0) {
+            if (m < targets.length) {
+              targetIndices[m++] = (short)i;
+            }
+          } else {
+            zeroStripeIndices[k++] = (short)i;
+          }
+        }
+      }
+    }
+
+    private long getReadLength(int index) {
+      long blockLen = getBlockLen(blockGroup, index);
+      long remaining = blockLen - positionInBlock;
+      return remaining > bufferSize ? bufferSize : remaining;
+    }
+
+    /**
+     * Read from minimum source DNs required for reconstruction in the 
iteration.
+     * First try the success list which we think they are the best DNs
+     * If source DN is corrupt or slow, try to read some other source DN, 
+     * and will update the success list. 
+     * 
+     * Remember the updated success list and return it for following 
+     * operations and next iteration read.
+     * 
+     * @param success the initial success list of source DNs we think best
+     * @return updated success list of source DNs we do real read
+     * @throws IOException
+     */
+    private int[] readMinimumStripedData4Recovery(final int[] success)
+        throws IOException {
+      int nsuccess = 0;
+      int[] newSuccess = new int[minRequiredSources];
+      BitSet used = new BitSet(sources.length);
+      /*
+       * Read from minimum source DNs required, the success list contains
+       * source DNs which we think best.
+       */
+      for (int i = 0; i < minRequiredSources; i++) {
+        StripedReader reader = stripedReaders.get(success[i]);
+        if (getReadLength(liveIndices[success[i]]) > 0) {
+          Callable<Void> readCallable = readFromBlock(
+              reader.blockReader, reader.buffer);
+          Future<Void> f = readService.submit(readCallable);
+          futures.put(f, success[i]);
+        } else {
+          // If the read length is 0, we don't need to do real read
+          reader.buffer.position(0);
+          newSuccess[nsuccess++] = success[i];
+        }
+        used.set(success[i]);
+      }
+
+      while (!futures.isEmpty()) {
+        try {
+          StripingChunkReadResult result =
+              StripedBlockUtil.getNextCompletedStripedRead(
+                  readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
+          int resultIndex = -1;
+          if (result.state == StripingChunkReadResult.SUCCESSFUL) {
+            resultIndex = result.index;
+          } else if (result.state == StripingChunkReadResult.FAILED) {
+            // If read failed for some source DN, we should not use it anymore 
+            // and schedule read from another source DN.
+            StripedReader failedReader = stripedReaders.get(result.index);
+            closeBlockReader(failedReader.blockReader);
+            failedReader.blockReader = null;
+            resultIndex = scheduleNewRead(used);
+          } else if (result.state == StripingChunkReadResult.TIMEOUT) {
+            // If timeout, we also schedule a new read.
+            resultIndex = scheduleNewRead(used);
+          }
+          if (resultIndex >= 0) {
+            newSuccess[nsuccess++] = resultIndex;
+            if (nsuccess >= minRequiredSources) {
+              // cancel remaining reads if we read successfully from minimum
+              // number of source DNs required by reconstruction.
+              cancelReads(futures.keySet());
+              futures.clear();
+              break;
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.info("Read data interrupted.", e);
+          break;
+        }
+      }
+
+      if (nsuccess < minRequiredSources) {
+        String error = "Can't read data from minimum number of sources "
+            + "required by reconstruction, block id: " + 
blockGroup.getBlockId();
+        throw new IOException(error);
+      }
+
+      return newSuccess;
+    }
+    
+    private void paddingBufferToLen(ByteBuffer buffer, int len) {
+      int toPadding = len - buffer.position();
+      for (int i = 0; i < toPadding; i++) {
+        buffer.put((byte) 0);
+      }
+    }
+    
+    // Initialize decoder
+    private void initDecoderIfNecessary() {
+      if (decoder == null) {
+        decoder = newDecoder(dataBlkNum, parityBlkNum);
+      }
+    }
+
+    private int[] getErasedIndices(boolean[] targetsStatus) {
+      int[] result = new int[targets.length];
+      int m = 0;
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          result[m++] = convertIndex4Decode(targetIndices[i], 
+              dataBlkNum, parityBlkNum);
+        }
+      }
+      return Arrays.copyOf(result, m);
+    }
+
+    private void recoverTargets(int[] success, boolean[] targetsStatus,
+        int toRecoverLen) {
+      initDecoderIfNecessary();
+      ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+      for (int i = 0; i < success.length; i++) {
+        StripedReader reader = stripedReaders.get(success[i]);
+        ByteBuffer buffer = reader.buffer;
+        paddingBufferToLen(buffer, toRecoverLen);
+        inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = 
+            (ByteBuffer)buffer.flip();
+      }
+      if (success.length < dataBlkNum) {
+        for (int i = 0; i < zeroStripeBuffers.length; i++) {
+          ByteBuffer buffer = zeroStripeBuffers[i];
+          paddingBufferToLen(buffer, toRecoverLen);
+          int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
+              parityBlkNum);
+          inputs[index] = (ByteBuffer)buffer.flip();
+        }
+      }
+      int[] erasedIndices = getErasedIndices(targetsStatus);
+      ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length];
+      int m = 0;
+      for (int i = 0; i < targetBuffers.length; i++) {
+        if (targetsStatus[i]) {
+          outputs[m++] = targetBuffers[i];
+          outputs[i].limit(toRecoverLen);
+        }
+      }
+      decoder.decode(inputs, erasedIndices, outputs);
+
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          long blockLen = getBlockLen(blockGroup, targetIndices[i]);
+          long remaining = blockLen - positionInBlock;
+          if (remaining < 0) {
+            targetBuffers[i].limit(0);
+          } else if (remaining < toRecoverLen) {
+            targetBuffers[i].limit((int)remaining);
+          }
+        }
+      }
+    }
+
+    /**
+     * Schedule a read from some new source DN if some DN is corrupted
+     * or slow, this is called from the read iteration.
+     * Initially we may only have <code>minRequiredSources</code> number of 
+     * StripedReader.
+     * If the position is at the end of target block, don't need to do 
+     * real read, and return the array index of source DN, otherwise -1.
+     * 
+     * @param used the used source DNs in this iteration.
+     * @return the array index of source DN if don't need to do real read.
+     */
+    private int scheduleNewRead(BitSet used) {
+      StripedReader reader = null;
+      // step1: initially we may only have <code>minRequiredSources</code>
+      // number of StripedReader, and there may be some source DNs we never 
+      // read before, so will try to create StripedReader for one new source DN
+      // and try to read from it. If found, go to step 3.
+      int m = stripedReaders.size();
+      while (reader == null && m < sources.length) {
+        reader = addStripedReader(m, positionInBlock);
+        if (getReadLength(liveIndices[m]) > 0) {
+          if (reader.blockReader == null) {
+            reader = null;
+            m++;
+          }
+        } else {
+          used.set(m);
+          return m;
+        }
+      }
+
+      // step2: if there is no new source DN we can use, try to find a source 
+      // DN we ever read from but because some reason, e.g., slow, it
+      // is not in the success DN list at the begin of this iteration, so 
+      // we have not tried it in this iteration. Now we have a chance to 
+      // revisit it again.
+      for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
+        if (!used.get(i)) {
+          StripedReader r = stripedReaders.get(i);
+          if (getReadLength(liveIndices[i]) > 0) {
+            closeBlockReader(r.blockReader);
+            r.blockReader = newBlockReader(
+                getBlock(blockGroup, liveIndices[i]), positionInBlock,
+                sources[i]);
+            if (r.blockReader != null) {
+              m = i;
+              reader = r;
+            }
+          } else {
+            used.set(i);
+            r.buffer.position(0);
+            return i;
+          }
+        }
+      }
+
+      // step3: schedule if find a correct source DN and need to do real read.
+      if (reader != null) {
+        Callable<Void> readCallable = readFromBlock(
+            reader.blockReader, reader.buffer);
+        Future<Void> f = readService.submit(readCallable);
+        futures.put(f, m);
+        used.set(m);
+      }
+
+      return -1;
+    }
+
+    // cancel all reads.
+    private void cancelReads(Collection<Future<Void>> futures) {
+      for (Future<Void> future : futures) {
+        future.cancel(true);
+      }
+    }
+
+    private Callable<Void> readFromBlock(final BlockReader reader,
+        final ByteBuffer buf) {
+      return new Callable<Void>() {
+
+        @Override
+        public Void call() throws Exception {
+          try {
+            actualReadFromBlock(reader, buf);
+            return null;
+          } catch (IOException e) {
+            LOG.info(e.getMessage());
+            throw e;
+          }
+        }
+
+      };
+    }
+
+    /**
+     * Read bytes from block
+     */
+    private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
+        throws IOException {
+      int len = buf.remaining();
+      int n = 0;
+      while (n < len) {
+        int nread = reader.read(buf);
+        if (nread <= 0) {
+          break;
+        }
+        n += nread;
+      }
+    }
+
+    // close block reader
+    private void closeBlockReader(BlockReader blockReader) {
+      try {
+        if (blockReader != null) {
+          blockReader.close();
+        }
+      } catch (IOException e) {
+        // ignore
+      }
+    }
+
+    private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
+      return NetUtils.createSocketAddr(dnInfo.getXferAddr(
+          datanode.getDnConf().getConnectToDnViaHostname()));
+    }
+
+    private BlockReader newBlockReader(final ExtendedBlock block, 
+        long offsetInBlock, DatanodeInfo dnInfo) {
+      if (offsetInBlock >= block.getNumBytes()) {
+        return null;
+      }
+      try {
+        InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
+        Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
+            block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
+        /*
+         * This can be further improved if the replica is local, then we can
+         * read directly from DN and need to check the replica is FINALIZED
+         * state, notice we should not use short-circuit local read which
+         * requires config for domain-socket in UNIX or legacy config in 
Windows.
+         */
+        return RemoteBlockReader2.newBlockReader(
+            "dummy", block, blockToken, offsetInBlock, 
+            block.getNumBytes() - offsetInBlock, true,
+            "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
+            null, cachingStrategy);
+      } catch (IOException e) {
+        return null;
+      }
+    }
+
+    private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
+        Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+        throws IOException {
+      Peer peer = null;
+      boolean success = false;
+      Socket sock = null;
+      final int socketTimeout = datanode.getDnConf().getSocketTimeout(); 
+      try {
+        sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+        NetUtils.connect(sock, addr, socketTimeout);
+        peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(), 
+            sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
+            blockToken, datanodeId);
+        peer.setReadTimeout(socketTimeout);
+        success = true;
+        return peer;
+      } finally {
+        if (!success) {
+          IOUtils.cleanup(LOG, peer);
+          IOUtils.closeSocket(sock);
+        }
+      }
+    }
+
+    /**
+     * Send data to targets
+     */
+    private int transferData2Targets(boolean[] targetsStatus) {
+      int nsuccess = 0;
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          boolean success = false;
+          try {
+            ByteBuffer buffer = targetBuffers[i];
+            
+            if (buffer.remaining() == 0) {
+              continue;
+            }
+
+            checksum.calculateChunkedSums(
+                buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
+
+            int ckOff = 0;
+            while (buffer.remaining() > 0) {
+              DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
+                  blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, 
false);
+              int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
+              int toWrite = buffer.remaining() > maxBytesToPacket ?
+                  maxBytesToPacket : buffer.remaining();
+              int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * 
checksumSize;
+              packet.writeChecksum(checksumBuf, ckOff, ckLen);
+              ckOff += ckLen;
+              packet.writeData(buffer, toWrite);
+
+              // Send packet
+              packet.writeTo(targetOutputStreams[i]);
+
+              blockOffset4Targets[i] += toWrite;
+              nsuccess++;
+              success = true;
+            }
+          } catch (IOException e) {
+            LOG.warn(e.getMessage());
+          }
+          targetsStatus[i] = success;
+        }
+      }
+      return nsuccess;
+    }
+
+    /**
+     * clear all buffers
+     */
+    private void clearBuffers() {
+      for (StripedReader stripedReader : stripedReaders) {
+        if (stripedReader.buffer != null) {
+          stripedReader.buffer.clear();
+        }
+      }
+
+      if (zeroStripeBuffers != null) {
+        for (int i = 0; i < zeroStripeBuffers.length; i++) {
+          zeroStripeBuffers[i].clear();
+        }
+      }
+
+      for (int i = 0; i < targetBuffers.length; i++) {
+        if (targetBuffers[i] != null) {
+          cleanBuffer(targetBuffers[i]);
+        }
+      }
+    }
+    
+    private ByteBuffer cleanBuffer(ByteBuffer buffer) {
+      Arrays.fill(buffer.array(), (byte) 0);
+      return (ByteBuffer)buffer.clear();
+    }
+
+    // send an empty packet to mark the end of the block
+    private void endTargetBlocks(boolean[] targetsStatus) {
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          try {
+            DFSPacket packet = new DFSPacket(packetBuf, 0, 
+                blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, 
true);
+            packet.writeTo(targetOutputStreams[i]);
+            targetOutputStreams[i].flush();
+          } catch (IOException e) {
+            LOG.warn(e.getMessage());
+          }
+        }
+      }
+    }
+
+    /**
+     * Initialize  output/input streams for transferring data to target
+     * and send create block request. 
+     */
+    private int initTargetStreams(boolean[] targetsStatus) {
+      int nsuccess = 0;
+      for (int i = 0; i < targets.length; i++) {
+        Socket socket = null;
+        DataOutputStream out = null;
+        DataInputStream in = null;
+        boolean success = false;
+        try {
+          InetSocketAddress targetAddr = 
+              getSocketAddress4Transfer(targets[i]);
+          socket = datanode.newSocket();
+          NetUtils.connect(socket, targetAddr, 
+              datanode.getDnConf().getSocketTimeout());
+          socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
+
+          ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
+          Token<BlockTokenIdentifier> blockToken = 
+              datanode.getBlockAccessToken(block,
+                  EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+
+          long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
+          OutputStream unbufOut = NetUtils.getOutputStream(socket, 
writeTimeout);
+          InputStream unbufIn = NetUtils.getInputStream(socket);
+          DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+          IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
+              socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
+
+          unbufOut = saslStreams.out;
+          unbufIn = saslStreams.in;
+
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              DFSUtil.getSmallBufferSize(conf)));
+          in = new DataInputStream(unbufIn);
+
+          DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
+          new Sender(out).writeBlock(block, targetStorageTypes[i], 
+              blockToken, "", new DatanodeInfo[]{targets[i]}, 
+              new StorageType[]{targetStorageTypes[i]}, source, 
+              BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, 
+              checksum, cachingStrategy, false, false, null);
+
+          targetSockets[i] = socket;
+          targetOutputStreams[i] = out;
+          targetInputStreams[i] = in;
+          nsuccess++;
+          success = true;
+        } catch (Throwable e) {
+          LOG.warn(e.getMessage());
+        } finally {
+          if (!success) {
+            IOUtils.closeStream(out);
+            IOUtils.closeStream(in);
+            IOUtils.closeStream(socket);
+          }
+        }
+        targetsStatus[i] = success;
+      }
+      return nsuccess;
+    }
+  }
+
+  private static class StripedReader {
+    private final short index; // internal block index
+    private BlockReader blockReader;
+    private ByteBuffer buffer;
+
+    private StripedReader(short index) {
+      this.index = index;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index afacebb..ea6ba54 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -174,8 +175,20 @@ public class Mover {
     }
   }
 
-  DBlock newDBlock(Block block, List<MLocation> locations) {
-    final DBlock db = new DBlock(block);
+  DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
+                   ECSchema ecSchema) {
+    Block blk = lb.getBlock().getLocalBlock();
+    DBlock db;
+    if (lb.isStriped()) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+      byte[] indices = new byte[lsb.getBlockIndices().length];
+      for (int i = 0; i < indices.length; i++) {
+        indices[i] = (byte) lsb.getBlockIndices()[i];
+      }
+      db = new DBlockStriped(blk, indices, (short) ecSchema.getNumDataUnits());
+    } else {
+      db = new DBlock(blk);
+    }
     for(MLocation ml : locations) {
       StorageGroup source = storages.getSource(ml);
       if (source != null) {
@@ -358,9 +371,10 @@ public class Mover {
         LOG.warn("Failed to get the storage policy of file " + fullPath);
         return;
       }
-      final List<StorageType> types = policy.chooseStorageTypes(
+      List<StorageType> types = policy.chooseStorageTypes(
           status.getReplication());
 
+      final ECSchema ecSchema = status.getECSchema();
       final LocatedBlocks locatedBlocks = status.getBlockLocations();
       final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
       List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
@@ -370,10 +384,13 @@ public class Mover {
           continue;
         }
         LocatedBlock lb = lbs.get(i);
+        if (lb.isStriped()) {
+          types = policy.chooseStorageTypes((short) lb.getLocations().length);
+        }
         final StorageTypeDiff diff = new StorageTypeDiff(types,
             lb.getStorageTypes());
         if (!diff.removeOverlap(true)) {
-          if (scheduleMoves4Block(diff, lb)) {
+          if (scheduleMoves4Block(diff, lb, ecSchema)) {
             result.updateHasRemaining(diff.existing.size() > 1
                 && diff.expected.size() > 1);
             // One block scheduled successfully, set noBlockMoved to false
@@ -385,10 +402,13 @@ public class Mover {
       }
     }
 
-    boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+    boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb,
+                                ECSchema ecSchema) {
       final List<MLocation> locations = MLocation.toLocations(lb);
-      Collections.shuffle(locations);
-      final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
+      if (!(lb instanceof LocatedStripedBlock)) {
+        Collections.shuffle(locations);
+      }
+      final DBlock db = newDBlock(lb, locations, ecSchema);
 
       for (final StorageType t : diff.existing) {
         for (final MLocation ml : locations) {
@@ -781,4 +801,4 @@ public class Mover {
       System.exit(-1);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java
new file mode 100644
index 0000000..4c4aae9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This manages EC schemas predefined and activated in the system.
+ * It loads customized schemas and syncs with persisted ones in
+ * NameNode image.
+ *
+ * This class is instantiated by the FSNamesystem.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public final class ErasureCodingSchemaManager {
+
+  /**
+   * TODO: HDFS-8095
+   */
+  private static final int DEFAULT_DATA_BLOCKS = 6;
+  private static final int DEFAULT_PARITY_BLOCKS = 3;
+  private static final String DEFAULT_CODEC_NAME = "rs";
+  private static final String DEFAULT_SCHEMA_NAME = "RS-6-3";
+  private static final ECSchema SYS_DEFAULT_SCHEMA =
+      new ECSchema(DEFAULT_SCHEMA_NAME,
+               DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS);
+
+  //We may add more later.
+  private static ECSchema[] SYS_SCHEMAS = new ECSchema[] {
+      SYS_DEFAULT_SCHEMA
+  };
+
+  /**
+   * All active EC activeSchemas maintained in NN memory for fast querying,
+   * identified and sorted by its name.
+   */
+  private final Map<String, ECSchema> activeSchemas;
+
+  ErasureCodingSchemaManager() {
+
+    this.activeSchemas = new TreeMap<String, ECSchema>();
+    for (ECSchema schema : SYS_SCHEMAS) {
+      activeSchemas.put(schema.getSchemaName(), schema);
+    }
+
+    /**
+     * TODO: HDFS-7859 persist into NameNode
+     * load persistent schemas from image and editlog, which is done only once
+     * during NameNode startup. This can be done here or in a separate method.
+     */
+  }
+
+  /**
+   * Get system defined schemas.
+   * @return system schemas
+   */
+  public static ECSchema[] getSystemSchemas() {
+    return SYS_SCHEMAS;
+  }
+
+  /**
+   * Get system-wide default EC schema, which can be used by default when no
+   * schema is specified for an EC zone.
+   * @return schema
+   */
+  public static ECSchema getSystemDefaultSchema() {
+    return SYS_DEFAULT_SCHEMA;
+  }
+
+  /**
+   * Tell the specified schema is the system default one or not.
+   * @param schema
+   * @return true if it's the default false otherwise
+   */
+  public static boolean isSystemDefault(ECSchema schema) {
+    if (schema == null) {
+      throw new IllegalArgumentException("Invalid schema parameter");
+    }
+
+    // schema name is the identifier.
+    return SYS_DEFAULT_SCHEMA.getSchemaName().equals(schema.getSchemaName());
+  }
+
+  /**
+   * Get all EC schemas that's available to use.
+   * @return all EC schemas
+   */
+  public ECSchema[] getSchemas() {
+    ECSchema[] results = new ECSchema[activeSchemas.size()];
+    return activeSchemas.values().toArray(results);
+  }
+
+  /**
+   * Get the EC schema specified by the schema name.
+   * @param schemaName
+   * @return EC schema specified by the schema name
+   */
+  public ECSchema getSchema(String schemaName) {
+    return activeSchemas.get(schemaName);
+  }
+
+  /**
+   * Clear and clean up
+   */
+  public void clear() {
+    activeSchemas.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
new file mode 100644
index 0000000..2638126
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_ZONE;
+
+/**
+ * Manages the list of erasure coding zones in the filesystem.
+ * <p/>
+ * The ErasureCodingZoneManager has its own lock, but relies on the FSDirectory
+ * lock being held for many operations. The FSDirectory lock should not be
+ * taken if the manager lock is already held.
+ * TODO: consolidate zone logic w/ encrypt. zones {@link EncryptionZoneManager}
+ */
+public class ErasureCodingZoneManager {
+  private final FSDirectory dir;
+
+  /**
+   * Construct a new ErasureCodingZoneManager.
+   *
+   * @param dir Enclosing FSDirectory
+   */
+  public ErasureCodingZoneManager(FSDirectory dir) {
+    this.dir = dir;
+  }
+
+  ECSchema getErasureCodingSchema(INodesInPath iip) throws IOException {
+    ErasureCodingZone ecZone = getErasureCodingZone(iip);
+    return ecZone == null ? null : ecZone.getSchema();
+  }
+
+  ErasureCodingZone getErasureCodingZone(INodesInPath iip) throws IOException {
+    assert dir.hasReadLock();
+    Preconditions.checkNotNull(iip, "INodes cannot be null");
+    List<INode> inodes = iip.getReadOnlyINodes();
+    for (int i = inodes.size() - 1; i >= 0; i--) {
+      final INode inode = inodes.get(i);
+      if (inode == null) {
+        continue;
+      }
+      // We don't allow symlinks in an EC zone, or pointing to a file/dir in
+      // an EC. Therefore if a symlink is encountered, the dir shouldn't have
+      // EC
+      // TODO: properly support symlinks in EC zones
+      if (inode.isSymlink()) {
+        return null;
+      }
+      final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
+          new ArrayList<XAttr>(0)
+          : inode.getXAttrFeature().getXAttrs();
+      for (XAttr xAttr : xAttrs) {
+        if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) 
{
+          ByteArrayInputStream bIn=new ByteArrayInputStream(xAttr.getValue());
+          DataInputStream dIn=new DataInputStream(bIn);
+          int cellSize = WritableUtils.readVInt(dIn);
+          String schemaName = WritableUtils.readString(dIn);
+          ECSchema schema = dir.getFSNamesystem()
+              .getErasureCodingSchemaManager().getSchema(schemaName);
+          return new ErasureCodingZone(dir.getInode(inode.getId())
+              .getFullPathName(), schema, cellSize);
+        }
+      }
+    }
+    return null;
+  }
+
+  List<XAttr> createErasureCodingZone(final INodesInPath srcIIP,
+      ECSchema schema, int cellSize) throws IOException {
+    assert dir.hasWriteLock();
+    Preconditions.checkNotNull(srcIIP, "INodes cannot be null");
+    String src = srcIIP.getPath();
+    if (dir.isNonEmptyDirectory(srcIIP)) {
+      throw new IOException(
+          "Attempt to create an erasure coding zone for a " +
+              "non-empty directory " + src);
+    }
+    if (srcIIP.getLastINode() != null &&
+        !srcIIP.getLastINode().isDirectory()) {
+      throw new IOException("Attempt to create an erasure coding zone " +
+          "for a file " + src);
+    }
+    if (getErasureCodingSchema(srcIIP) != null) {
+      throw new IOException("Directory " + src + " is already in an " +
+          "erasure coding zone.");
+    }
+
+    // System default schema will be used since no specified.
+    if (schema == null) {
+      schema = ErasureCodingSchemaManager.getSystemDefaultSchema();
+    }
+
+    if (cellSize <= 0) {
+      cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+    }
+
+    // Write the cellsize first and then schema name
+    final XAttr ecXAttr;
+    DataOutputStream dOut = null;
+    try {
+      ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+      dOut = new DataOutputStream(bOut);
+      WritableUtils.writeVInt(dOut, cellSize);
+      // Now persist the schema name in xattr
+      WritableUtils.writeString(dOut, schema.getSchemaName());
+      ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
+          bOut.toByteArray());
+    } finally {
+      IOUtils.closeStream(dOut);
+    }
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    xattrs.add(ecXAttr);
+    FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
+        EnumSet.of(XAttrSetFlag.CREATE));
+    return xattrs;
+  }
+
+  void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
+      throws IOException {
+    assert dir.hasReadLock();
+    final ErasureCodingZone srcZone = getErasureCodingZone(srcIIP);
+    final ErasureCodingZone dstZone = getErasureCodingZone(dstIIP);
+    if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) {
+      return;
+    }
+    final ECSchema srcSchema = (srcZone != null) ? srcZone.getSchema() : null;
+    final ECSchema dstSchema = (dstZone != null) ? dstZone.getSchema() : null;
+    if ((srcSchema != null && !srcSchema.equals(dstSchema)) ||
+        (dstSchema != null && !dstSchema.equals(srcSchema))) {
+      throw new IOException(
+          src + " can't be moved because the source and destination have " +
+              "different erasure coding policies.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
index 3d79d09..07a513c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -106,6 +106,11 @@ final class FSDirAppendOp {
                 + clientMachine);
       }
       final INodeFile file = INodeFile.valueOf(inode, path, true);
+      // not support appending file with striped blocks
+      if (file.isStriped()) {
+        throw new UnsupportedOperationException(
+            "Cannot append file with striped block " + src);
+      }
       BlockManager blockManager = fsd.getBlockManager();
       final BlockStoragePolicy lpPolicy = blockManager
           .getStoragePolicy("LAZY_PERSIST");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index d624f84..9b08092 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -401,7 +401,7 @@ public class FSDirAttrOp {
   static BlockInfo[] unprotectedSetReplication(
       FSDirectory fsd, String src, short replication, short[] blockRepls)
       throws QuotaExceededException, UnresolvedLinkException,
-             SnapshotAccessControlException {
+      SnapshotAccessControlException, UnsupportedActionException {
     assert fsd.hasWriteLock();
 
     final INodesInPath iip = fsd.getINodesInPath4Write(src, true);
@@ -410,6 +410,11 @@ public class FSDirAttrOp {
       return null;
     }
     INodeFile file = inode.asFile();
+    if (file.isStriped()) {
+      throw new UnsupportedActionException(
+          "Cannot set replication to a file with striped blocks");
+    }
+
     final short oldBR = file.getPreferredBlockReplication();
 
     // before setFileReplication, check for increasing block replication.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index bb00130..7503272 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -143,6 +143,7 @@ class FSDirConcatOp {
         throw new HadoopIllegalArgumentException("concat: source file " + src
             + " is invalid or empty or underConstruction");
       }
+
       // source file's preferred block size cannot be greater than the target
       // file
       if (srcINodeFile.getPreferredBlockSize() >
@@ -152,6 +153,11 @@ class FSDirConcatOp {
             + " which is greater than the target file's preferred block size "
             + targetINode.getPreferredBlockSize());
       }
+      // TODO currently we do not support concatenating EC files
+      if (srcINodeFile.isStriped()) {
+        throw new HadoopIllegalArgumentException("concat: the src file " + src
+            + " is with striped blocks");
+      }
       si.add(srcINodeFile);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
new file mode 100644
index 0000000..fd7ef33
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+/**
+ * Helper class to perform erasure coding related operations.
+ */
+final class FSDirErasureCodingOp {
+
+  /**
+   * Private constructor for preventing FSDirErasureCodingOp object
+   * creation. Static-only class.
+   */
+  private FSDirErasureCodingOp() {}
+
+  /**
+   * Create an erasure coding zone on directory src.
+   *
+   * @param fsn namespace
+   * @param srcArg the path of a directory which will be the root of the
+   *          erasure coding zone. The directory must be empty.
+   * @param schema ECSchema for the erasure coding zone
+   * @param cellSize Cell size of stripe
+   * @param logRetryCache whether to record RPC ids in editlog for retry
+   *          cache rebuilding
+   * @return {@link HdfsFileStatus}
+   * @throws IOException
+   */
+  static HdfsFileStatus createErasureCodingZone(final FSNamesystem fsn,
+      final String srcArg, final ECSchema schema, final int cellSize,
+      final boolean logRetryCache) throws IOException {
+    assert fsn.hasWriteLock();
+
+    String src = srcArg;
+    FSPermissionChecker pc = null;
+    byte[][] pathComponents = null;
+    pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    pc = fsn.getPermissionChecker();
+    FSDirectory fsd = fsn.getFSDirectory();
+    src = fsd.resolvePath(pc, src, pathComponents);
+    final INodesInPath iip;
+    List<XAttr> xAttrs;
+    fsd.writeLock();
+    try {
+      iip = fsd.getINodesInPath4Write(src, false);
+      xAttrs = fsn.getErasureCodingZoneManager().createErasureCodingZone(
+          iip, schema, cellSize);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsn.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
+    return fsd.getAuditFileInfo(iip);
+  }
+
+  /**
+   * Get the erasure coding zone information for specified path.
+   *
+   * @param fsn namespace
+   * @param src path
+   * @return {@link ErasureCodingZone}
+   * @throws IOException
+   */
+  static ErasureCodingZone getErasureCodingZone(final FSNamesystem fsn,
+      final String src) throws IOException {
+    assert fsn.hasReadLock();
+
+    final INodesInPath iip = getINodesInPath(fsn, src);
+    return getErasureCodingZoneForPath(fsn, iip);
+  }
+
+  /**
+   * Get erasure coding zone information for specified path.
+   *
+   * @param fsn namespace
+   * @param iip inodes in the path containing the file
+   * @return {@link ErasureCodingZone}
+   * @throws IOException
+   */
+  static ErasureCodingZone getErasureCodingZone(final FSNamesystem fsn,
+      final INodesInPath iip) throws IOException {
+    assert fsn.hasReadLock();
+
+    return getErasureCodingZoneForPath(fsn, iip);
+  }
+
+  /**
+   * Check if the file is in erasure coding zone.
+   *
+   * @param fsn namespace
+   * @param srcArg path
+   * @return true represents the file is in erasure coding zone, false otw
+   * @throws IOException
+   */
+  static boolean isInErasureCodingZone(final FSNamesystem fsn,
+      final String srcArg) throws IOException {
+    assert fsn.hasReadLock();
+
+    final INodesInPath iip = getINodesInPath(fsn, srcArg);
+    return getErasureCodingSchemaForPath(fsn, iip) != null;
+  }
+
+  /**
+   * Check if the file is in erasure coding zone.
+   *
+   * @param fsn namespace
+   * @param iip inodes in the path containing the file
+   * @return true represents the file is in erasure coding zone, false otw
+   * @throws IOException
+   */
+  static boolean isInErasureCodingZone(final FSNamesystem fsn,
+      final INodesInPath iip) throws IOException {
+    return getErasureCodingSchema(fsn, iip) != null;
+  }
+
+  /**
+   * Get erasure coding schema.
+   *
+   * @param fsn namespace
+   * @param iip inodes in the path containing the file
+   * @return {@link ECSchema}
+   * @throws IOException
+   */
+  static ECSchema getErasureCodingSchema(final FSNamesystem fsn,
+      final INodesInPath iip) throws IOException {
+    assert fsn.hasReadLock();
+
+    return getErasureCodingSchemaForPath(fsn, iip);
+  }
+
+  /**
+   * Get available erasure coding schemas.
+   *
+   * @param fsn namespace
+   * @return {@link ECSchema} array
+   */
+  static ECSchema[] getErasureCodingSchemas(final FSNamesystem fsn)
+      throws IOException {
+    assert fsn.hasReadLock();
+
+    return fsn.getErasureCodingSchemaManager().getSchemas();
+  }
+
+  /**
+   * Get the ECSchema specified by the name.
+   *
+   * @param fsn namespace
+   * @param schemaName schema name
+   * @return {@link ECSchema}
+   */
+  static ECSchema getErasureCodingSchema(final FSNamesystem fsn,
+      final String schemaName) throws IOException {
+    assert fsn.hasReadLock();
+
+    return fsn.getErasureCodingSchemaManager().getSchema(schemaName);
+  }
+
+  private static INodesInPath getINodesInPath(final FSNamesystem fsn,
+      final String srcArg) throws IOException {
+    String src = srcArg;
+    final byte[][] pathComponents = FSDirectory
+        .getPathComponentsForReservedPath(src);
+    final FSDirectory fsd = fsn.getFSDirectory();
+    final FSPermissionChecker pc = fsn.getPermissionChecker();
+    src = fsd.resolvePath(pc, src, pathComponents);
+    INodesInPath iip = fsd.getINodesInPath(src, true);
+    if (fsn.isPermissionEnabled()) {
+      fsn.getFSDirectory().checkPathAccess(pc, iip, FsAction.READ);
+    }
+    return iip;
+  }
+
+  private static ErasureCodingZone getErasureCodingZoneForPath(
+      final FSNamesystem fsn, final INodesInPath iip) throws IOException {
+    final FSDirectory fsd = fsn.getFSDirectory();
+    fsd.readLock();
+    try {
+      return fsn.getErasureCodingZoneManager().getErasureCodingZone(iip);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+
+  private static ECSchema getErasureCodingSchemaForPath(final FSNamesystem fsn,
+      final INodesInPath iip) throws IOException {
+    final FSDirectory fsd = fsn.getFSDirectory();
+    fsd.readLock();
+    try {
+      return fsn.getErasureCodingZoneManager().getErasureCodingSchema(iip);
+    } finally {
+      fsd.readUnlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index b69bb42..127474c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -185,6 +185,7 @@ class FSDirRenameOp {
     }
 
     fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+    fsd.ecZoneManager.checkMoveValidity(srcIIP, dstIIP, src);
     // Ensure dst has quota to accommodate rename
     verifyFsLimitsForRename(fsd, srcIIP, dstIIP);
     verifyQuotaForRename(fsd, srcIIP, dstIIP);
@@ -357,6 +358,7 @@ class FSDirRenameOp {
 
     BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
     fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
+    fsd.ecZoneManager.checkMoveValidity(srcIIP, dstIIP, src);
     final INode dstInode = dstIIP.getLastINode();
     List<INodeDirectory> snapshottableDirs = new ArrayList<>();
     if (dstInode != null) { // Destination exists

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 4a45074..c4cfd34 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.io.Charsets;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -29,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -40,6 +42,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import 
org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -179,10 +182,12 @@ class FSDirStatAndListingOp {
 
       final FileEncryptionInfo feInfo = isReservedName ? null
           : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
+      final ErasureCodingZone ecZone = 
FSDirErasureCodingOp.getErasureCodingZone(
+          fsd.getFSNamesystem(), iip);
 
       final LocatedBlocks blocks = bm.createLocatedBlocks(
           inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
-          length, needBlockToken, iip.isSnapshot(), feInfo);
+          length, needBlockToken, iip.isSnapshot(), feInfo, ecZone);
 
       // Set caching information for the located blocks.
       for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@@ -374,7 +379,7 @@ class FSDirStatAndListingOp {
       if (fsd.getINode4DotSnapshot(srcs) != null) {
         return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
             HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
-            HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
+            HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null, 0);
       }
       return null;
     }
@@ -442,6 +447,11 @@ class FSDirStatAndListingOp {
     final FileEncryptionInfo feInfo = isRawPath ? null :
         fsd.getFileEncryptionInfo(node, snapshot, iip);
 
+    final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+        fsd.getFSNamesystem(), iip);
+    final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
+    final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
+
     if (node.isFile()) {
       final INodeFile fileNode = node.asFile();
       size = fileNode.computeFileSize(snapshot);
@@ -471,7 +481,9 @@ class FSDirStatAndListingOp {
         node.getId(),
         childrenNum,
         feInfo,
-        storagePolicy);
+        storagePolicy,
+        schema,
+        cellSize);
   }
 
   private static INodeAttributes getINodeAttributes(
@@ -494,6 +506,8 @@ class FSDirStatAndListingOp {
     final boolean isEncrypted;
     final FileEncryptionInfo feInfo = isRawPath ? null :
         fsd.getFileEncryptionInfo(node, snapshot, iip);
+    final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+        fsd.getFSNamesystem(), iip);
     if (node.isFile()) {
       final INodeFile fileNode = node.asFile();
       size = fileNode.computeFileSize(snapshot);
@@ -507,7 +521,7 @@ class FSDirStatAndListingOp {
 
       loc = fsd.getBlockManager().createLocatedBlocks(
           fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
-          inSnapshot, feInfo);
+          inSnapshot, feInfo, ecZone);
       if (loc == null) {
         loc = new LocatedBlocks();
       }
@@ -518,6 +532,8 @@ class FSDirStatAndListingOp {
     }
     int childrenNum = node.isDirectory() ?
         node.asDirectory().getChildrenNum(snapshot) : 0;
+    final ECSchema schema = ecZone != null ? ecZone.getSchema() : null;
+    final int cellSize = ecZone != null ? ecZone.getCellSize() : 0;
 
     HdfsLocatedFileStatus status =
         new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
@@ -526,7 +542,8 @@ class FSDirStatAndListingOp {
           getPermissionForFileStatus(nodeAttrs, isEncrypted),
           nodeAttrs.getUserName(), nodeAttrs.getGroupName(),
           node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-          node.getId(), loc, childrenNum, feInfo, storagePolicy);
+          node.getId(), loc, childrenNum, feInfo, storagePolicy, schema,
+          cellSize);
     // Set caching information for the located blocks.
     if (loc != null) {
       CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index 474c257..5d2bf09 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -86,6 +87,12 @@ final class FSDirTruncateOp {
       final BlockStoragePolicy lpPolicy = fsd.getBlockManager()
           .getStoragePolicy("LAZY_PERSIST");
 
+      // not support truncating file with striped blocks
+      if (file.isStriped()) {
+        throw new UnsupportedOperationException(
+            "Cannot truncate file with striped block " + src);
+      }
+
       if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
         throw new UnsupportedOperationException(
             "Cannot truncate lazy persist file " + src);
@@ -214,10 +221,12 @@ final class FSDirTruncateOp {
         file.getFileUnderConstructionFeature().getClientName(), file.getId());
     boolean shouldRecoverNow = (newBlock == null);
     BlockInfo oldBlock = file.getLastBlock();
+    Preconditions.checkState(!oldBlock.isStriped());
     boolean shouldCopyOnTruncate = shouldCopyOnTruncate(fsn, file, oldBlock);
     if (newBlock == null) {
-      newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock() : new Block(
-          oldBlock.getBlockId(), oldBlock.getNumBytes(),
+      newBlock = (shouldCopyOnTruncate) ?
+          fsn.createNewBlock(file.isStriped()) :
+          new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
           fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock(
               oldBlock)));
     }
@@ -231,7 +240,8 @@ final class FSDirTruncateOp {
           file.getPreferredBlockReplication());
       truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
       truncatedBlockUC.setTruncateBlock(oldBlock);
-      file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
+      file.convertLastBlockToUC(truncatedBlockUC,
+          blockManager.getStorages(oldBlock));
       blockManager.addBlockCollection(truncatedBlockUC, file);
 
       NameNode.stateChangeLog.debug(

Reply via email to