http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java new file mode 100644 index 0000000..3c9adc4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -0,0 +1,988 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSPacket; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.RemoteBlockReader2; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Preconditions; + +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; + +/** + * ErasureCodingWorker handles the erasure coding recovery work commands. These + * commands would be issued from Namenode as part of Datanode's heart beat + * response. BPOfferService delegates the work to this class for handling EC + * commands. + */ +public final class ErasureCodingWorker { + private static final Log LOG = DataNode.LOG; + + private final DataNode datanode; + private final Configuration conf; + + private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; + private final int STRIPED_READ_THRESHOLD_MILLIS; + private final int STRIPED_READ_BUFFER_SIZE; + + public ErasureCodingWorker(Configuration conf, DataNode datanode) { + this.datanode = datanode; + this.conf = conf; + + STRIPED_READ_THRESHOLD_MILLIS = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT); + initializeStripedReadThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT)); + STRIPED_READ_BUFFER_SIZE = conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); + } + + private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { + return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits); + } + + private void initializeStripedReadThreadPool(int num) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped reads; pool threads=" + num); + } + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, + TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedRead-" + threadIndex.getAndIncrement()); + return t; + } + }, new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { + LOG.info("Execution for striped reading rejected, " + + "Executing in current thread"); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + } + + /** + * Handles the Erasure Coding recovery work commands. + * + * @param ecTasks + * BlockECRecoveryInfo + */ + public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) { + for (BlockECRecoveryInfo recoveryInfo : ecTasks) { + try { + new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start(); + } catch (Throwable e) { + LOG.warn("Failed to recover striped block " + + recoveryInfo.getExtendedBlock().getLocalBlock(), e); + } + } + } + + /** + * ReconstructAndTransferBlock recover one or more missed striped block in the + * striped block group, the minimum number of live striped blocks should be + * no less than data block number. + * + * | <- Striped Block Group -> | + * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group + * | | | | + * v v v v + * +------+ +------+ +------+ +------+ + * |cell_0| |cell_1| |cell_2| |cell_3| ... + * +------+ +------+ +------+ +------+ + * |cell_4| |cell_5| |cell_6| |cell_7| ... + * +------+ +------+ +------+ +------+ + * |cell_8| |cell_9| |cell10| |cell11| ... + * +------+ +------+ +------+ +------+ + * ... ... ... ... + * + * + * We use following steps to recover striped block group, in each round, we + * recover <code>bufferSize</code> data until finish, the + * <code>bufferSize</code> is configurable and may be less or larger than + * cell size: + * step1: read <code>bufferSize</code> data from minimum number of sources + * required by recovery. + * step2: decode data for targets. + * step3: transfer data to targets. + * + * In step1, try to read <code>bufferSize</code> data from minimum number + * of sources , if there is corrupt or stale sources, read from new source + * will be scheduled. The best sources are remembered for next round and + * may be updated in each round. + * + * In step2, typically if source blocks we read are all data blocks, we + * need to call encode, and if there is one parity block, we need to call + * decode. Notice we only read once and recover all missed striped block + * if they are more than one. + * + * In step3, send the recovered data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the recovery work + * are one of the source datanodes, so the recovered data are sent + * remotely. + * + * There are some points we can do further improvements in next phase: + * 1. we can read the block file directly on the local datanode, + * currently we use remote block reader. (Notice short-circuit is not + * a good choice, see inline comments). + * 2. We need to check the packet ack for EC recovery? Since EC recovery + * is more expensive than continuous block replication, it needs to + * read from several other datanodes, should we make sure the + * recovered result received by targets? + */ + private class ReconstructAndTransferBlock implements Runnable { + private final int dataBlkNum; + private final int parityBlkNum; + private final int cellSize; + + private RawErasureDecoder decoder; + + // Striped read buffer size + private int bufferSize; + + private final ExtendedBlock blockGroup; + private final int minRequiredSources; + // position in striped internal block + private long positionInBlock; + + // sources + private final short[] liveIndices; + private final DatanodeInfo[] sources; + + private final List<StripedReader> stripedReaders; + + // The buffers and indices for striped blocks whose length is 0 + private ByteBuffer[] zeroStripeBuffers; + private short[] zeroStripeIndices; + + // targets + private final DatanodeInfo[] targets; + private final StorageType[] targetStorageTypes; + + private final short[] targetIndices; + private final ByteBuffer[] targetBuffers; + + private final Socket[] targetSockets; + private final DataOutputStream[] targetOutputStreams; + private final DataInputStream[] targetInputStreams; + + private final long[] blockOffset4Targets; + private final long[] seqNo4Targets; + + private final static int WRITE_PACKET_SIZE = 64 * 1024; + private DataChecksum checksum; + private int maxChunksPerPacket; + private byte[] packetBuf; + private byte[] checksumBuf; + private int bytesPerChecksum; + private int checksumSize; + + private final CachingStrategy cachingStrategy; + + private final Map<Future<Void>, Integer> futures = new HashMap<>(); + private final CompletionService<Void> readService = + new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL); + + ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { + ECSchema schema = recoveryInfo.getECSchema(); + dataBlkNum = schema.getNumDataUnits(); + parityBlkNum = schema.getNumParityUnits(); + cellSize = recoveryInfo.getCellSize(); + + blockGroup = recoveryInfo.getExtendedBlock(); + final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); + minRequiredSources = Math.min(cellsNum, dataBlkNum); + + liveIndices = recoveryInfo.getLiveBlockIndices(); + sources = recoveryInfo.getSourceDnInfos(); + stripedReaders = new ArrayList<>(sources.length); + + Preconditions.checkArgument(liveIndices.length >= minRequiredSources, + "No enough live striped blocks."); + Preconditions.checkArgument(liveIndices.length == sources.length, + "liveBlockIndices and source dns should match"); + + if (minRequiredSources < dataBlkNum) { + zeroStripeBuffers = + new ByteBuffer[dataBlkNum - minRequiredSources]; + zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; + } + + targets = recoveryInfo.getTargetDnInfos(); + targetStorageTypes = recoveryInfo.getTargetStorageTypes(); + targetIndices = new short[targets.length]; + targetBuffers = new ByteBuffer[targets.length]; + + Preconditions.checkArgument(targetIndices.length <= parityBlkNum, + "Too much missed striped blocks."); + + targetSockets = new Socket[targets.length]; + targetOutputStreams = new DataOutputStream[targets.length]; + targetInputStreams = new DataInputStream[targets.length]; + + blockOffset4Targets = new long[targets.length]; + seqNo4Targets = new long[targets.length]; + + for (int i = 0; i < targets.length; i++) { + blockOffset4Targets[i] = 0; + seqNo4Targets[i] = 0; + } + + getTargetIndices(); + cachingStrategy = CachingStrategy.newDefaultStrategy(); + } + + private ByteBuffer allocateBuffer(int length) { + return ByteBuffer.allocate(length); + } + + private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize, + dataBlkNum, i); + } + + private long getBlockLen(ExtendedBlock blockGroup, int i) { + return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(), + cellSize, dataBlkNum, i); + } + + /** + * StripedReader is used to read from one source DN, it contains a block + * reader, buffer and striped block index. + * Only allocate StripedReader once for one source, and the StripedReader + * has the same array order with sources. Typically we only need to allocate + * minimum number (minRequiredSources) of StripedReader, and allocate + * new for new source DN if some existing DN invalid or slow. + * If some source DN is corrupt, set the corresponding blockReader to + * null and will never read from it again. + * + * @param i the array index of sources + * @param offsetInBlock offset for the internal block + * @return StripedReader + */ + private StripedReader addStripedReader(int i, long offsetInBlock) { + StripedReader reader = new StripedReader(liveIndices[i]); + stripedReaders.add(reader); + + BlockReader blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]); + if (blockReader != null) { + initChecksumAndBufferSizeIfNeeded(blockReader); + reader.blockReader = blockReader; + } + reader.buffer = allocateBuffer(bufferSize); + return reader; + } + + @Override + public void run() { + datanode.incrementXmitsInProgress(); + try { + // Store the array indices of source DNs we have read successfully. + // In each iteration of read, the success list may be updated if + // some source DN is corrupted or slow. And use the updated success + // list of DNs for next iteration read. + int[] success = new int[minRequiredSources]; + + int nsuccess = 0; + for (int i = 0; + i < sources.length && nsuccess < minRequiredSources; i++) { + StripedReader reader = addStripedReader(i, 0); + if (reader.blockReader != null) { + success[nsuccess++] = i; + } + } + + if (nsuccess < minRequiredSources) { + String error = "Can't find minimum sources required by " + + "recovery, block id: " + blockGroup.getBlockId(); + throw new IOException(error); + } + + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i] = allocateBuffer(bufferSize); + } + } + + for (int i = 0; i < targets.length; i++) { + targetBuffers[i] = allocateBuffer(bufferSize); + } + + checksumSize = checksum.getChecksumSize(); + int chunkSize = bytesPerChecksum + checksumSize; + maxChunksPerPacket = Math.max( + (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1); + int maxPacketSize = chunkSize * maxChunksPerPacket + + PacketHeader.PKT_MAX_HEADER_LEN; + + packetBuf = new byte[maxPacketSize]; + checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)]; + + // targetsStatus store whether some target is success, it will record + // any failed target once, if some target failed (invalid DN or transfer + // failed), will not transfer data to it any more. + boolean[] targetsStatus = new boolean[targets.length]; + if (initTargetStreams(targetsStatus) == 0) { + String error = "All targets are failed."; + throw new IOException(error); + } + + long firstStripedBlockLength = getBlockLen(blockGroup, 0); + while (positionInBlock < firstStripedBlockLength) { + int toRead = Math.min( + bufferSize, (int)(firstStripedBlockLength - positionInBlock)); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + success = readMinimumStripedData4Recovery(success); + + // step2: decode to reconstruct targets + long remaining = firstStripedBlockLength - positionInBlock; + int toRecoverLen = remaining < bufferSize ? + (int)remaining : bufferSize; + recoverTargets(success, targetsStatus, toRecoverLen); + + // step3: transfer data + if (transferData2Targets(targetsStatus) == 0) { + String error = "Transfer failed for all targets."; + throw new IOException(error); + } + + clearBuffers(); + positionInBlock += toRead; + } + + endTargetBlocks(targetsStatus); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to recover striped block: " + blockGroup, e); + } finally { + datanode.decrementXmitsInProgress(); + // close block readers + for (StripedReader stripedReader : stripedReaders) { + closeBlockReader(stripedReader.blockReader); + } + for (int i = 0; i < targets.length; i++) { + IOUtils.closeStream(targetOutputStreams[i]); + IOUtils.closeStream(targetInputStreams[i]); + IOUtils.closeStream(targetSockets[i]); + } + } + } + + // init checksum from block reader + private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) { + if (checksum == null) { + checksum = blockReader.getDataChecksum(); + bytesPerChecksum = checksum.getBytesPerChecksum(); + // The bufferSize is flat to divide bytesPerChecksum + int readBufferSize = STRIPED_READ_BUFFER_SIZE; + bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : + readBufferSize - readBufferSize % bytesPerChecksum; + } else { + assert blockReader.getDataChecksum().equals(checksum); + } + } + + private void getTargetIndices() { + BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); + for (int i = 0; i < sources.length; i++) { + bitset.set(liveIndices[i]); + } + int m = 0; + int k = 0; + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { + if (!bitset.get(i)) { + if (getBlockLen(blockGroup, i) > 0) { + if (m < targets.length) { + targetIndices[m++] = (short)i; + } + } else { + zeroStripeIndices[k++] = (short)i; + } + } + } + } + + private long getReadLength(int index) { + long blockLen = getBlockLen(blockGroup, index); + long remaining = blockLen - positionInBlock; + return remaining > bufferSize ? bufferSize : remaining; + } + + /** + * Read from minimum source DNs required for reconstruction in the iteration. + * First try the success list which we think they are the best DNs + * If source DN is corrupt or slow, try to read some other source DN, + * and will update the success list. + * + * Remember the updated success list and return it for following + * operations and next iteration read. + * + * @param success the initial success list of source DNs we think best + * @return updated success list of source DNs we do real read + * @throws IOException + */ + private int[] readMinimumStripedData4Recovery(final int[] success) + throws IOException { + int nsuccess = 0; + int[] newSuccess = new int[minRequiredSources]; + BitSet used = new BitSet(sources.length); + /* + * Read from minimum source DNs required, the success list contains + * source DNs which we think best. + */ + for (int i = 0; i < minRequiredSources; i++) { + StripedReader reader = stripedReaders.get(success[i]); + if (getReadLength(liveIndices[success[i]]) > 0) { + Callable<Void> readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future<Void> f = readService.submit(readCallable); + futures.put(f, success[i]); + } else { + // If the read length is 0, we don't need to do real read + reader.buffer.position(0); + newSuccess[nsuccess++] = success[i]; + } + used.set(success[i]); + } + + while (!futures.isEmpty()) { + try { + StripingChunkReadResult result = + StripedBlockUtil.getNextCompletedStripedRead( + readService, futures, STRIPED_READ_THRESHOLD_MILLIS); + int resultIndex = -1; + if (result.state == StripingChunkReadResult.SUCCESSFUL) { + resultIndex = result.index; + } else if (result.state == StripingChunkReadResult.FAILED) { + // If read failed for some source DN, we should not use it anymore + // and schedule read from another source DN. + StripedReader failedReader = stripedReaders.get(result.index); + closeBlockReader(failedReader.blockReader); + failedReader.blockReader = null; + resultIndex = scheduleNewRead(used); + } else if (result.state == StripingChunkReadResult.TIMEOUT) { + // If timeout, we also schedule a new read. + resultIndex = scheduleNewRead(used); + } + if (resultIndex >= 0) { + newSuccess[nsuccess++] = resultIndex; + if (nsuccess >= minRequiredSources) { + // cancel remaining reads if we read successfully from minimum + // number of source DNs required by reconstruction. + cancelReads(futures.keySet()); + futures.clear(); + break; + } + } + } catch (InterruptedException e) { + LOG.info("Read data interrupted.", e); + break; + } + } + + if (nsuccess < minRequiredSources) { + String error = "Can't read data from minimum number of sources " + + "required by reconstruction, block id: " + blockGroup.getBlockId(); + throw new IOException(error); + } + + return newSuccess; + } + + private void paddingBufferToLen(ByteBuffer buffer, int len) { + int toPadding = len - buffer.position(); + for (int i = 0; i < toPadding; i++) { + buffer.put((byte) 0); + } + } + + // Initialize decoder + private void initDecoderIfNecessary() { + if (decoder == null) { + decoder = newDecoder(dataBlkNum, parityBlkNum); + } + } + + private int[] getErasedIndices(boolean[] targetsStatus) { + int[] result = new int[targets.length]; + int m = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + result[m++] = convertIndex4Decode(targetIndices[i], + dataBlkNum, parityBlkNum); + } + } + return Arrays.copyOf(result, m); + } + + private void recoverTargets(int[] success, boolean[] targetsStatus, + int toRecoverLen) { + initDecoderIfNecessary(); + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + for (int i = 0; i < success.length; i++) { + StripedReader reader = stripedReaders.get(success[i]); + ByteBuffer buffer = reader.buffer; + paddingBufferToLen(buffer, toRecoverLen); + inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = + (ByteBuffer)buffer.flip(); + } + if (success.length < dataBlkNum) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + ByteBuffer buffer = zeroStripeBuffers[i]; + paddingBufferToLen(buffer, toRecoverLen); + int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum, + parityBlkNum); + inputs[index] = (ByteBuffer)buffer.flip(); + } + } + int[] erasedIndices = getErasedIndices(targetsStatus); + ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length]; + int m = 0; + for (int i = 0; i < targetBuffers.length; i++) { + if (targetsStatus[i]) { + outputs[m++] = targetBuffers[i]; + outputs[i].limit(toRecoverLen); + } + } + decoder.decode(inputs, erasedIndices, outputs); + + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + long blockLen = getBlockLen(blockGroup, targetIndices[i]); + long remaining = blockLen - positionInBlock; + if (remaining < 0) { + targetBuffers[i].limit(0); + } else if (remaining < toRecoverLen) { + targetBuffers[i].limit((int)remaining); + } + } + } + } + + /** + * Schedule a read from some new source DN if some DN is corrupted + * or slow, this is called from the read iteration. + * Initially we may only have <code>minRequiredSources</code> number of + * StripedReader. + * If the position is at the end of target block, don't need to do + * real read, and return the array index of source DN, otherwise -1. + * + * @param used the used source DNs in this iteration. + * @return the array index of source DN if don't need to do real read. + */ + private int scheduleNewRead(BitSet used) { + StripedReader reader = null; + // step1: initially we may only have <code>minRequiredSources</code> + // number of StripedReader, and there may be some source DNs we never + // read before, so will try to create StripedReader for one new source DN + // and try to read from it. If found, go to step 3. + int m = stripedReaders.size(); + while (reader == null && m < sources.length) { + reader = addStripedReader(m, positionInBlock); + if (getReadLength(liveIndices[m]) > 0) { + if (reader.blockReader == null) { + reader = null; + m++; + } + } else { + used.set(m); + return m; + } + } + + // step2: if there is no new source DN we can use, try to find a source + // DN we ever read from but because some reason, e.g., slow, it + // is not in the success DN list at the begin of this iteration, so + // we have not tried it in this iteration. Now we have a chance to + // revisit it again. + for (int i = 0; reader == null && i < stripedReaders.size(); i++) { + if (!used.get(i)) { + StripedReader r = stripedReaders.get(i); + if (getReadLength(liveIndices[i]) > 0) { + closeBlockReader(r.blockReader); + r.blockReader = newBlockReader( + getBlock(blockGroup, liveIndices[i]), positionInBlock, + sources[i]); + if (r.blockReader != null) { + m = i; + reader = r; + } + } else { + used.set(i); + r.buffer.position(0); + return i; + } + } + } + + // step3: schedule if find a correct source DN and need to do real read. + if (reader != null) { + Callable<Void> readCallable = readFromBlock( + reader.blockReader, reader.buffer); + Future<Void> f = readService.submit(readCallable); + futures.put(f, m); + used.set(m); + } + + return -1; + } + + // cancel all reads. + private void cancelReads(Collection<Future<Void>> futures) { + for (Future<Void> future : futures) { + future.cancel(true); + } + } + + private Callable<Void> readFromBlock(final BlockReader reader, + final ByteBuffer buf) { + return new Callable<Void>() { + + @Override + public Void call() throws Exception { + try { + actualReadFromBlock(reader, buf); + return null; + } catch (IOException e) { + LOG.info(e.getMessage()); + throw e; + } + } + + }; + } + + /** + * Read bytes from block + */ + private void actualReadFromBlock(BlockReader reader, ByteBuffer buf) + throws IOException { + int len = buf.remaining(); + int n = 0; + while (n < len) { + int nread = reader.read(buf); + if (nread <= 0) { + break; + } + n += nread; + } + } + + // close block reader + private void closeBlockReader(BlockReader blockReader) { + try { + if (blockReader != null) { + blockReader.close(); + } + } catch (IOException e) { + // ignore + } + } + + private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { + return NetUtils.createSocketAddr(dnInfo.getXferAddr( + datanode.getDnConf().getConnectToDnViaHostname())); + } + + private BlockReader newBlockReader(final ExtendedBlock block, + long offsetInBlock, DatanodeInfo dnInfo) { + if (offsetInBlock >= block.getNumBytes()) { + return null; + } + try { + InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo); + Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken( + block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ)); + /* + * This can be further improved if the replica is local, then we can + * read directly from DN and need to check the replica is FINALIZED + * state, notice we should not use short-circuit local read which + * requires config for domain-socket in UNIX or legacy config in Windows. + */ + return RemoteBlockReader2.newBlockReader( + "dummy", block, blockToken, offsetInBlock, + block.getNumBytes() - offsetInBlock, true, + "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, + null, cachingStrategy); + } catch (IOException e) { + return null; + } + } + + private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException { + Peer peer = null; + boolean success = false; + Socket sock = null; + final int socketTimeout = datanode.getDnConf().getSocketTimeout(); + try { + sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + NetUtils.connect(sock, addr, socketTimeout); + peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(), + sock, datanode.getDataEncryptionKeyFactoryForBlock(b), + blockToken, datanodeId); + peer.setReadTimeout(socketTimeout); + success = true; + return peer; + } finally { + if (!success) { + IOUtils.cleanup(LOG, peer); + IOUtils.closeSocket(sock); + } + } + } + + /** + * Send data to targets + */ + private int transferData2Targets(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + boolean success = false; + try { + ByteBuffer buffer = targetBuffers[i]; + + if (buffer.remaining() == 0) { + continue; + } + + checksum.calculateChunkedSums( + buffer.array(), 0, buffer.remaining(), checksumBuf, 0); + + int ckOff = 0; + while (buffer.remaining() > 0) { + DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false); + int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum; + int toWrite = buffer.remaining() > maxBytesToPacket ? + maxBytesToPacket : buffer.remaining(); + int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize; + packet.writeChecksum(checksumBuf, ckOff, ckLen); + ckOff += ckLen; + packet.writeData(buffer, toWrite); + + // Send packet + packet.writeTo(targetOutputStreams[i]); + + blockOffset4Targets[i] += toWrite; + nsuccess++; + success = true; + } + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + targetsStatus[i] = success; + } + } + return nsuccess; + } + + /** + * clear all buffers + */ + private void clearBuffers() { + for (StripedReader stripedReader : stripedReaders) { + if (stripedReader.buffer != null) { + stripedReader.buffer.clear(); + } + } + + if (zeroStripeBuffers != null) { + for (int i = 0; i < zeroStripeBuffers.length; i++) { + zeroStripeBuffers[i].clear(); + } + } + + for (int i = 0; i < targetBuffers.length; i++) { + if (targetBuffers[i] != null) { + cleanBuffer(targetBuffers[i]); + } + } + } + + private ByteBuffer cleanBuffer(ByteBuffer buffer) { + Arrays.fill(buffer.array(), (byte) 0); + return (ByteBuffer)buffer.clear(); + } + + // send an empty packet to mark the end of the block + private void endTargetBlocks(boolean[] targetsStatus) { + for (int i = 0; i < targets.length; i++) { + if (targetsStatus[i]) { + try { + DFSPacket packet = new DFSPacket(packetBuf, 0, + blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true); + packet.writeTo(targetOutputStreams[i]); + targetOutputStreams[i].flush(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + } + } + } + } + + /** + * Initialize output/input streams for transferring data to target + * and send create block request. + */ + private int initTargetStreams(boolean[] targetsStatus) { + int nsuccess = 0; + for (int i = 0; i < targets.length; i++) { + Socket socket = null; + DataOutputStream out = null; + DataInputStream in = null; + boolean success = false; + try { + InetSocketAddress targetAddr = + getSocketAddress4Transfer(targets[i]); + socket = datanode.newSocket(); + NetUtils.connect(socket, targetAddr, + datanode.getDnConf().getSocketTimeout()); + socket.setSoTimeout(datanode.getDnConf().getSocketTimeout()); + + ExtendedBlock block = getBlock(blockGroup, targetIndices[i]); + Token<BlockTokenIdentifier> blockToken = + datanode.getBlockAccessToken(block, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); + + long writeTimeout = datanode.getDnConf().getSocketWriteTimeout(); + OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(socket); + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.getSaslClient().socketSend( + socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]); + + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + DFSUtil.getSmallBufferSize(conf))); + in = new DataInputStream(unbufIn); + + DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId()); + new Sender(out).writeBlock(block, targetStorageTypes[i], + blockToken, "", new DatanodeInfo[]{targets[i]}, + new StorageType[]{targetStorageTypes[i]}, source, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, + checksum, cachingStrategy, false, false, null); + + targetSockets[i] = socket; + targetOutputStreams[i] = out; + targetInputStreams[i] = in; + nsuccess++; + success = true; + } catch (Throwable e) { + LOG.warn(e.getMessage()); + } finally { + if (!success) { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeStream(socket); + } + } + targetsStatus[i] = success; + } + return nsuccess; + } + } + + private static class StripedReader { + private final short index; // internal block index + private BlockReader blockReader; + private ByteBuffer buffer; + + private StripedReader(short index) { + this.index = index; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index afacebb..ea6ba54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -174,8 +175,20 @@ public class Mover { } } - DBlock newDBlock(Block block, List<MLocation> locations) { - final DBlock db = new DBlock(block); + DBlock newDBlock(LocatedBlock lb, List<MLocation> locations, + ECSchema ecSchema) { + Block blk = lb.getBlock().getLocalBlock(); + DBlock db; + if (lb.isStriped()) { + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + byte[] indices = new byte[lsb.getBlockIndices().length]; + for (int i = 0; i < indices.length; i++) { + indices[i] = (byte) lsb.getBlockIndices()[i]; + } + db = new DBlockStriped(blk, indices, (short) ecSchema.getNumDataUnits()); + } else { + db = new DBlock(blk); + } for(MLocation ml : locations) { StorageGroup source = storages.getSource(ml); if (source != null) { @@ -358,9 +371,10 @@ public class Mover { LOG.warn("Failed to get the storage policy of file " + fullPath); return; } - final List<StorageType> types = policy.chooseStorageTypes( + List<StorageType> types = policy.chooseStorageTypes( status.getReplication()); + final ECSchema ecSchema = status.getECSchema(); final LocatedBlocks locatedBlocks = status.getBlockLocations(); final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete(); List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks(); @@ -370,10 +384,13 @@ public class Mover { continue; } LocatedBlock lb = lbs.get(i); + if (lb.isStriped()) { + types = policy.chooseStorageTypes((short) lb.getLocations().length); + } final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); if (!diff.removeOverlap(true)) { - if (scheduleMoves4Block(diff, lb)) { + if (scheduleMoves4Block(diff, lb, ecSchema)) { result.updateHasRemaining(diff.existing.size() > 1 && diff.expected.size() > 1); // One block scheduled successfully, set noBlockMoved to false @@ -385,10 +402,13 @@ public class Mover { } } - boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { + boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb, + ECSchema ecSchema) { final List<MLocation> locations = MLocation.toLocations(lb); - Collections.shuffle(locations); - final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations); + if (!(lb instanceof LocatedStripedBlock)) { + Collections.shuffle(locations); + } + final DBlock db = newDBlock(lb, locations, ecSchema); for (final StorageType t : diff.existing) { for (final MLocation ml : locations) { @@ -781,4 +801,4 @@ public class Mover { System.exit(-1); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java new file mode 100644 index 0000000..4c4aae9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingSchemaManager.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import java.util.Map; +import java.util.TreeMap; + +/** + * This manages EC schemas predefined and activated in the system. + * It loads customized schemas and syncs with persisted ones in + * NameNode image. + * + * This class is instantiated by the FSNamesystem. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +public final class ErasureCodingSchemaManager { + + /** + * TODO: HDFS-8095 + */ + private static final int DEFAULT_DATA_BLOCKS = 6; + private static final int DEFAULT_PARITY_BLOCKS = 3; + private static final String DEFAULT_CODEC_NAME = "rs"; + private static final String DEFAULT_SCHEMA_NAME = "RS-6-3"; + private static final ECSchema SYS_DEFAULT_SCHEMA = + new ECSchema(DEFAULT_SCHEMA_NAME, + DEFAULT_CODEC_NAME, DEFAULT_DATA_BLOCKS, DEFAULT_PARITY_BLOCKS); + + //We may add more later. + private static ECSchema[] SYS_SCHEMAS = new ECSchema[] { + SYS_DEFAULT_SCHEMA + }; + + /** + * All active EC activeSchemas maintained in NN memory for fast querying, + * identified and sorted by its name. + */ + private final Map<String, ECSchema> activeSchemas; + + ErasureCodingSchemaManager() { + + this.activeSchemas = new TreeMap<String, ECSchema>(); + for (ECSchema schema : SYS_SCHEMAS) { + activeSchemas.put(schema.getSchemaName(), schema); + } + + /** + * TODO: HDFS-7859 persist into NameNode + * load persistent schemas from image and editlog, which is done only once + * during NameNode startup. This can be done here or in a separate method. + */ + } + + /** + * Get system defined schemas. + * @return system schemas + */ + public static ECSchema[] getSystemSchemas() { + return SYS_SCHEMAS; + } + + /** + * Get system-wide default EC schema, which can be used by default when no + * schema is specified for an EC zone. + * @return schema + */ + public static ECSchema getSystemDefaultSchema() { + return SYS_DEFAULT_SCHEMA; + } + + /** + * Tell the specified schema is the system default one or not. + * @param schema + * @return true if it's the default false otherwise + */ + public static boolean isSystemDefault(ECSchema schema) { + if (schema == null) { + throw new IllegalArgumentException("Invalid schema parameter"); + } + + // schema name is the identifier. + return SYS_DEFAULT_SCHEMA.getSchemaName().equals(schema.getSchemaName()); + } + + /** + * Get all EC schemas that's available to use. + * @return all EC schemas + */ + public ECSchema[] getSchemas() { + ECSchema[] results = new ECSchema[activeSchemas.size()]; + return activeSchemas.values().toArray(results); + } + + /** + * Get the EC schema specified by the schema name. + * @param schemaName + * @return EC schema specified by the schema name + */ + public ECSchema getSchema(String schemaName) { + return activeSchemas.get(schemaName); + } + + /** + * Clear and clean up + */ + public void clear() { + activeSchemas.clear(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java new file mode 100644 index 0000000..2638126 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.hdfs.XAttrHelper; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_ZONE; + +/** + * Manages the list of erasure coding zones in the filesystem. + * <p/> + * The ErasureCodingZoneManager has its own lock, but relies on the FSDirectory + * lock being held for many operations. The FSDirectory lock should not be + * taken if the manager lock is already held. + * TODO: consolidate zone logic w/ encrypt. zones {@link EncryptionZoneManager} + */ +public class ErasureCodingZoneManager { + private final FSDirectory dir; + + /** + * Construct a new ErasureCodingZoneManager. + * + * @param dir Enclosing FSDirectory + */ + public ErasureCodingZoneManager(FSDirectory dir) { + this.dir = dir; + } + + ECSchema getErasureCodingSchema(INodesInPath iip) throws IOException { + ErasureCodingZone ecZone = getErasureCodingZone(iip); + return ecZone == null ? null : ecZone.getSchema(); + } + + ErasureCodingZone getErasureCodingZone(INodesInPath iip) throws IOException { + assert dir.hasReadLock(); + Preconditions.checkNotNull(iip, "INodes cannot be null"); + List<INode> inodes = iip.getReadOnlyINodes(); + for (int i = inodes.size() - 1; i >= 0; i--) { + final INode inode = inodes.get(i); + if (inode == null) { + continue; + } + // We don't allow symlinks in an EC zone, or pointing to a file/dir in + // an EC. Therefore if a symlink is encountered, the dir shouldn't have + // EC + // TODO: properly support symlinks in EC zones + if (inode.isSymlink()) { + return null; + } + final List<XAttr> xAttrs = inode.getXAttrFeature() == null ? + new ArrayList<XAttr>(0) + : inode.getXAttrFeature().getXAttrs(); + for (XAttr xAttr : xAttrs) { + if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) { + ByteArrayInputStream bIn=new ByteArrayInputStream(xAttr.getValue()); + DataInputStream dIn=new DataInputStream(bIn); + int cellSize = WritableUtils.readVInt(dIn); + String schemaName = WritableUtils.readString(dIn); + ECSchema schema = dir.getFSNamesystem() + .getErasureCodingSchemaManager().getSchema(schemaName); + return new ErasureCodingZone(dir.getInode(inode.getId()) + .getFullPathName(), schema, cellSize); + } + } + } + return null; + } + + List<XAttr> createErasureCodingZone(final INodesInPath srcIIP, + ECSchema schema, int cellSize) throws IOException { + assert dir.hasWriteLock(); + Preconditions.checkNotNull(srcIIP, "INodes cannot be null"); + String src = srcIIP.getPath(); + if (dir.isNonEmptyDirectory(srcIIP)) { + throw new IOException( + "Attempt to create an erasure coding zone for a " + + "non-empty directory " + src); + } + if (srcIIP.getLastINode() != null && + !srcIIP.getLastINode().isDirectory()) { + throw new IOException("Attempt to create an erasure coding zone " + + "for a file " + src); + } + if (getErasureCodingSchema(srcIIP) != null) { + throw new IOException("Directory " + src + " is already in an " + + "erasure coding zone."); + } + + // System default schema will be used since no specified. + if (schema == null) { + schema = ErasureCodingSchemaManager.getSystemDefaultSchema(); + } + + if (cellSize <= 0) { + cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + } + + // Write the cellsize first and then schema name + final XAttr ecXAttr; + DataOutputStream dOut = null; + try { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + dOut = new DataOutputStream(bOut); + WritableUtils.writeVInt(dOut, cellSize); + // Now persist the schema name in xattr + WritableUtils.writeString(dOut, schema.getSchemaName()); + ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE, + bOut.toByteArray()); + } finally { + IOUtils.closeStream(dOut); + } + final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1); + xattrs.add(ecXAttr); + FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs, + EnumSet.of(XAttrSetFlag.CREATE)); + return xattrs; + } + + void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src) + throws IOException { + assert dir.hasReadLock(); + final ErasureCodingZone srcZone = getErasureCodingZone(srcIIP); + final ErasureCodingZone dstZone = getErasureCodingZone(dstIIP); + if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) { + return; + } + final ECSchema srcSchema = (srcZone != null) ? srcZone.getSchema() : null; + final ECSchema dstSchema = (dstZone != null) ? dstZone.getSchema() : null; + if ((srcSchema != null && !srcSchema.equals(dstSchema)) || + (dstSchema != null && !dstSchema.equals(srcSchema))) { + throw new IOException( + src + " can't be moved because the source and destination have " + + "different erasure coding policies."); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java index 3d79d09..07a513c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java @@ -106,6 +106,11 @@ final class FSDirAppendOp { + clientMachine); } final INodeFile file = INodeFile.valueOf(inode, path, true); + // not support appending file with striped blocks + if (file.isStriped()) { + throw new UnsupportedOperationException( + "Cannot append file with striped block " + src); + } BlockManager blockManager = fsd.getBlockManager(); final BlockStoragePolicy lpPolicy = blockManager .getStoragePolicy("LAZY_PERSIST"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index d624f84..9b08092 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -401,7 +401,7 @@ public class FSDirAttrOp { static BlockInfo[] unprotectedSetReplication( FSDirectory fsd, String src, short replication, short[] blockRepls) throws QuotaExceededException, UnresolvedLinkException, - SnapshotAccessControlException { + SnapshotAccessControlException, UnsupportedActionException { assert fsd.hasWriteLock(); final INodesInPath iip = fsd.getINodesInPath4Write(src, true); @@ -410,6 +410,11 @@ public class FSDirAttrOp { return null; } INodeFile file = inode.asFile(); + if (file.isStriped()) { + throw new UnsupportedActionException( + "Cannot set replication to a file with striped blocks"); + } + final short oldBR = file.getPreferredBlockReplication(); // before setFileReplication, check for increasing block replication. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java index bb00130..7503272 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java @@ -143,6 +143,7 @@ class FSDirConcatOp { throw new HadoopIllegalArgumentException("concat: source file " + src + " is invalid or empty or underConstruction"); } + // source file's preferred block size cannot be greater than the target // file if (srcINodeFile.getPreferredBlockSize() > @@ -152,6 +153,11 @@ class FSDirConcatOp { + " which is greater than the target file's preferred block size " + targetINode.getPreferredBlockSize()); } + // TODO currently we do not support concatenating EC files + if (srcINodeFile.isStriped()) { + throw new HadoopIllegalArgumentException("concat: the src file " + src + + " is with striped blocks"); + } si.add(srcINodeFile); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java new file mode 100644 index 0000000..fd7ef33 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.io.erasurecode.ECSchema; + +/** + * Helper class to perform erasure coding related operations. + */ +final class FSDirErasureCodingOp { + + /** + * Private constructor for preventing FSDirErasureCodingOp object + * creation. Static-only class. + */ + private FSDirErasureCodingOp() {} + + /** + * Create an erasure coding zone on directory src. + * + * @param fsn namespace + * @param srcArg the path of a directory which will be the root of the + * erasure coding zone. The directory must be empty. + * @param schema ECSchema for the erasure coding zone + * @param cellSize Cell size of stripe + * @param logRetryCache whether to record RPC ids in editlog for retry + * cache rebuilding + * @return {@link HdfsFileStatus} + * @throws IOException + */ + static HdfsFileStatus createErasureCodingZone(final FSNamesystem fsn, + final String srcArg, final ECSchema schema, final int cellSize, + final boolean logRetryCache) throws IOException { + assert fsn.hasWriteLock(); + + String src = srcArg; + FSPermissionChecker pc = null; + byte[][] pathComponents = null; + pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + pc = fsn.getPermissionChecker(); + FSDirectory fsd = fsn.getFSDirectory(); + src = fsd.resolvePath(pc, src, pathComponents); + final INodesInPath iip; + List<XAttr> xAttrs; + fsd.writeLock(); + try { + iip = fsd.getINodesInPath4Write(src, false); + xAttrs = fsn.getErasureCodingZoneManager().createErasureCodingZone( + iip, schema, cellSize); + } finally { + fsd.writeUnlock(); + } + fsn.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); + return fsd.getAuditFileInfo(iip); + } + + /** + * Get the erasure coding zone information for specified path. + * + * @param fsn namespace + * @param src path + * @return {@link ErasureCodingZone} + * @throws IOException + */ + static ErasureCodingZone getErasureCodingZone(final FSNamesystem fsn, + final String src) throws IOException { + assert fsn.hasReadLock(); + + final INodesInPath iip = getINodesInPath(fsn, src); + return getErasureCodingZoneForPath(fsn, iip); + } + + /** + * Get erasure coding zone information for specified path. + * + * @param fsn namespace + * @param iip inodes in the path containing the file + * @return {@link ErasureCodingZone} + * @throws IOException + */ + static ErasureCodingZone getErasureCodingZone(final FSNamesystem fsn, + final INodesInPath iip) throws IOException { + assert fsn.hasReadLock(); + + return getErasureCodingZoneForPath(fsn, iip); + } + + /** + * Check if the file is in erasure coding zone. + * + * @param fsn namespace + * @param srcArg path + * @return true represents the file is in erasure coding zone, false otw + * @throws IOException + */ + static boolean isInErasureCodingZone(final FSNamesystem fsn, + final String srcArg) throws IOException { + assert fsn.hasReadLock(); + + final INodesInPath iip = getINodesInPath(fsn, srcArg); + return getErasureCodingSchemaForPath(fsn, iip) != null; + } + + /** + * Check if the file is in erasure coding zone. + * + * @param fsn namespace + * @param iip inodes in the path containing the file + * @return true represents the file is in erasure coding zone, false otw + * @throws IOException + */ + static boolean isInErasureCodingZone(final FSNamesystem fsn, + final INodesInPath iip) throws IOException { + return getErasureCodingSchema(fsn, iip) != null; + } + + /** + * Get erasure coding schema. + * + * @param fsn namespace + * @param iip inodes in the path containing the file + * @return {@link ECSchema} + * @throws IOException + */ + static ECSchema getErasureCodingSchema(final FSNamesystem fsn, + final INodesInPath iip) throws IOException { + assert fsn.hasReadLock(); + + return getErasureCodingSchemaForPath(fsn, iip); + } + + /** + * Get available erasure coding schemas. + * + * @param fsn namespace + * @return {@link ECSchema} array + */ + static ECSchema[] getErasureCodingSchemas(final FSNamesystem fsn) + throws IOException { + assert fsn.hasReadLock(); + + return fsn.getErasureCodingSchemaManager().getSchemas(); + } + + /** + * Get the ECSchema specified by the name. + * + * @param fsn namespace + * @param schemaName schema name + * @return {@link ECSchema} + */ + static ECSchema getErasureCodingSchema(final FSNamesystem fsn, + final String schemaName) throws IOException { + assert fsn.hasReadLock(); + + return fsn.getErasureCodingSchemaManager().getSchema(schemaName); + } + + private static INodesInPath getINodesInPath(final FSNamesystem fsn, + final String srcArg) throws IOException { + String src = srcArg; + final byte[][] pathComponents = FSDirectory + .getPathComponentsForReservedPath(src); + final FSDirectory fsd = fsn.getFSDirectory(); + final FSPermissionChecker pc = fsn.getPermissionChecker(); + src = fsd.resolvePath(pc, src, pathComponents); + INodesInPath iip = fsd.getINodesInPath(src, true); + if (fsn.isPermissionEnabled()) { + fsn.getFSDirectory().checkPathAccess(pc, iip, FsAction.READ); + } + return iip; + } + + private static ErasureCodingZone getErasureCodingZoneForPath( + final FSNamesystem fsn, final INodesInPath iip) throws IOException { + final FSDirectory fsd = fsn.getFSDirectory(); + fsd.readLock(); + try { + return fsn.getErasureCodingZoneManager().getErasureCodingZone(iip); + } finally { + fsd.readUnlock(); + } + } + + private static ECSchema getErasureCodingSchemaForPath(final FSNamesystem fsn, + final INodesInPath iip) throws IOException { + final FSDirectory fsd = fsn.getFSDirectory(); + fsd.readLock(); + try { + return fsn.getErasureCodingZoneManager().getErasureCodingSchema(iip); + } finally { + fsd.readUnlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java index b69bb42..127474c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java @@ -185,6 +185,7 @@ class FSDirRenameOp { } fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src); + fsd.ecZoneManager.checkMoveValidity(srcIIP, dstIIP, src); // Ensure dst has quota to accommodate rename verifyFsLimitsForRename(fsd, srcIIP, dstIIP); verifyQuotaForRename(fsd, srcIIP, dstIIP); @@ -357,6 +358,7 @@ class FSDirRenameOp { BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite(); fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src); + fsd.ecZoneManager.checkMoveValidity(srcIIP, dstIIP, src); final INode dstInode = dstIIP.getLastINode(); List<INodeDirectory> snapshottableDirs = new ArrayList<>(); if (dstInode != null) { // Destination exists http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 4a45074..c4cfd34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.base.Preconditions; + import org.apache.commons.io.Charsets; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.util.ReadOnlyList; +import org.apache.hadoop.io.erasurecode.ECSchema; import java.io.FileNotFoundException; import java.io.IOException; @@ -179,10 +182,12 @@ class FSDirStatAndListingOp { final FileEncryptionInfo feInfo = isReservedName ? null : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); + final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsd.getFSNamesystem(), iip); final LocatedBlocks blocks = bm.createLocatedBlocks( inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset, - length, needBlockToken, iip.isSnapshot(), feInfo); + length, needBlockToken, iip.isSnapshot(), feInfo, ecZone); // Set caching information for the located blocks. for (LocatedBlock lb : blocks.getLocatedBlocks()) { @@ -374,7 +379,7 @@ class FSDirStatAndListingOp { if (fsd.getINode4DotSnapshot(srcs) != null) { return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null, HdfsFileStatus.EMPTY_NAME, -1L, 0, null, - HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED); + HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, null, 0); } return null; } @@ -442,6 +447,11 @@ class FSDirStatAndListingOp { final FileEncryptionInfo feInfo = isRawPath ? null : fsd.getFileEncryptionInfo(node, snapshot, iip); + final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsd.getFSNamesystem(), iip); + final ECSchema schema = ecZone != null ? ecZone.getSchema() : null; + final int cellSize = ecZone != null ? ecZone.getCellSize() : 0; + if (node.isFile()) { final INodeFile fileNode = node.asFile(); size = fileNode.computeFileSize(snapshot); @@ -471,7 +481,9 @@ class FSDirStatAndListingOp { node.getId(), childrenNum, feInfo, - storagePolicy); + storagePolicy, + schema, + cellSize); } private static INodeAttributes getINodeAttributes( @@ -494,6 +506,8 @@ class FSDirStatAndListingOp { final boolean isEncrypted; final FileEncryptionInfo feInfo = isRawPath ? null : fsd.getFileEncryptionInfo(node, snapshot, iip); + final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsd.getFSNamesystem(), iip); if (node.isFile()) { final INodeFile fileNode = node.asFile(); size = fileNode.computeFileSize(snapshot); @@ -507,7 +521,7 @@ class FSDirStatAndListingOp { loc = fsd.getBlockManager().createLocatedBlocks( fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false, - inSnapshot, feInfo); + inSnapshot, feInfo, ecZone); if (loc == null) { loc = new LocatedBlocks(); } @@ -518,6 +532,8 @@ class FSDirStatAndListingOp { } int childrenNum = node.isDirectory() ? node.asDirectory().getChildrenNum(snapshot) : 0; + final ECSchema schema = ecZone != null ? ecZone.getSchema() : null; + final int cellSize = ecZone != null ? ecZone.getCellSize() : 0; HdfsLocatedFileStatus status = new HdfsLocatedFileStatus(size, node.isDirectory(), replication, @@ -526,7 +542,8 @@ class FSDirStatAndListingOp { getPermissionForFileStatus(nodeAttrs, isEncrypted), nodeAttrs.getUserName(), nodeAttrs.getGroupName(), node.isSymlink() ? node.asSymlink().getSymlink() : null, path, - node.getId(), loc, childrenNum, feInfo, storagePolicy); + node.getId(), loc, childrenNum, feInfo, storagePolicy, schema, + cellSize); // Set caching information for the located blocks. if (loc != null) { CacheManager cacheManager = fsd.getFSNamesystem().getCacheManager(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java index 474c257..5d2bf09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import com.google.common.base.Preconditions; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsAction; @@ -86,6 +87,12 @@ final class FSDirTruncateOp { final BlockStoragePolicy lpPolicy = fsd.getBlockManager() .getStoragePolicy("LAZY_PERSIST"); + // not support truncating file with striped blocks + if (file.isStriped()) { + throw new UnsupportedOperationException( + "Cannot truncate file with striped block " + src); + } + if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) { throw new UnsupportedOperationException( "Cannot truncate lazy persist file " + src); @@ -214,10 +221,12 @@ final class FSDirTruncateOp { file.getFileUnderConstructionFeature().getClientName(), file.getId()); boolean shouldRecoverNow = (newBlock == null); BlockInfo oldBlock = file.getLastBlock(); + Preconditions.checkState(!oldBlock.isStriped()); boolean shouldCopyOnTruncate = shouldCopyOnTruncate(fsn, file, oldBlock); if (newBlock == null) { - newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock() : new Block( - oldBlock.getBlockId(), oldBlock.getNumBytes(), + newBlock = (shouldCopyOnTruncate) ? + fsn.createNewBlock(file.isStriped()) : + new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(), fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock( oldBlock))); } @@ -231,7 +240,8 @@ final class FSDirTruncateOp { file.getPreferredBlockReplication()); truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); truncatedBlockUC.setTruncateBlock(oldBlock); - file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock)); + file.convertLastBlockToUC(truncatedBlockUC, + blockManager.getStorages(oldBlock)); blockManager.addBlockCollection(truncatedBlockUC, file); NameNode.stateChangeLog.debug(